2023 年 2 月: 映画、テレビ番組のすべてのスコアとそれらのストリーミング先を 1 ページで確認したいと考えていましたが、自分に関連するすべてのソースを含むアグリゲーターが見つかりませんでした。
2023 年 3 月: そこで、その場でスコアを取得する MVP を構築し、サイトをオンラインにしました。動作しましたが、遅かったです (スコアの表示に 10 秒かかりました)。
2023 年 10 月: データを自分側に保存することが必要であることに気づき、windmill.dev を発見しました。少なくとも私のニーズに関しては、同様のオーケストレーション エンジンを簡単に上回ります。
今日は、12 か月間継続的にデータを読み込んだ後、パイプラインがどのように動作するかを詳しく共有したいと思います。さまざまなソースからデータを取得し、データを正規化し、クエリ用に最適化された形式に結合する複雑なシステムを構築する方法を学びます。
これは実行ビューです。すべてのドットはフローの実行を表します。フローは何でも構いません。たとえば、単純なワンステップ スクリプトです。
中央のブロックには次のようなスクリプトが含まれています (簡略化):
def main(): return tmdb_extract_daily_dump_data() def tmdb_extract_daily_dump_data(): print("Checking TMDB for latest daily dumps") init_mongodb() daily_dump_infos = get_daily_dump_infos() for daily_dump_info in daily_dump_infos: download_zip_and_store_in_db(daily_dump_info) close_mongodb() return [info.to_mongo() for info in daily_dump_infos] [...]
次の beast もフローです (これは緑色の点の 1 つにすぎないことに注意してください):
(高解像度画像: https://i.imgur.com/LGhTUGG.png)
これを詳しく見てみましょう:
これらの各手順は多かれ少なかれ複雑で、非同期プロセスの使用が含まれます。
次にどのタイトルを選択するかを決定するために、並行して処理される 2 つのレーンがあります。ここも風車が輝くエリアです。並列化とオーケストレーションは、そのアーキテクチャで完璧に機能します。
次のアイテムを選択するための 2 つのレーンは次のとおりです:
まず、データが添付されていないタイトルがデータ ソースごとに選択されます。つまり、Metacritic パイプラインにまだスクレイピングされていないムービーがある場合、それが次に選択されます。これにより、新しいタイトルを含め、すべてのタイトルが少なくとも 1 回処理されるようになります。
すべてのタイトルにデータが添付されると、パイプラインは最も新しいデータを持つタイトルを選択します。
これはそのようなフロー実行の例です。ここではレート制限に達したためエラーが発生しています:
Windmill を使用すると、フロー内の各ステップの再試行を簡単に定義できます。この場合、エラーが発生した場合に 3 回再試行するというロジックになります。レート制限に達しない限り (通常は別のステータス コードまたはエラー メッセージ)、直ちに停止します。
上記は機能しますが、深刻な問題があります。最近のリリースは適切なタイミングで更新されていません。すべてのデータ側面が正常に取得されるまでには、数週間、場合によっては数か月かかる場合があります。たとえば、映画に最近の IMDb スコアが含まれているにもかかわらず、他のスコアが古く、ストリーミング リンクが完全に欠落している場合があります。特にスコアとストリーミングの可用性については、さらに高い精度を達成したいと考えていました。
この問題を解決するために、2 番目のレーンは別の優先順位付け戦略に焦点を当てています。最も人気のあるトレンドの映画/番組が選択され、すべてのデータ ソースにわたって完全なデータ更新が行われます。 このフローは以前に示しました。それは私が以前に獣と呼んだものです。
アプリでより頻繁に表示されるタイトルの優先順位も上がります。つまり、映画や番組が検索結果の上位に表示されるたび、または詳細ビューが開かれるたびに、すぐに更新される可能性があります。
すべてのタイトルは、優先レーンを使用して 週に 1 回のみ更新できます。これにより、その間に変更されていない可能性が高いデータが取得されないようにすることができます。
「スクレイピングは合法ですか?」と疑問に思うかもしれません。データを取得する行為は通常は問題ありません。ただし、データをどう扱うかは慎重に検討する必要があります。 スクレイピングされたデータを使用するサービスから利益を得た時点で、その利用規約に違反している可能性があります。 (Web スクレイピングの法的状況と「スクレイピング」は単なる自動化されたアクセスであり、誰もがそれを行っているを参照) )
スクレイピングと関連法は新しいものであり、テストされていないことが多く、法的なグレーゾーンがたくさんあります。私は、サービスへの影響を最小限に抑えるために、すべての情報源を適宜引用し、レート制限を尊重し、不必要なリクエストを避けることを決意しています。
実際のところ、データは利益を得るために使用されるものではありません。 GoodWatch は誰でも永久に無料で使用できます。
Windmill はワーカーを使用して、コードの実行を複数のプロセスに分散します。 フローの各ステップはワーカーに送信されるため、実際のビジネス ロジックから独立します。 メイン アプリのみがジョブを調整し、ワーカーは入力データ、実行するコード、結果を受け取るだけです。
これは、適切に拡張できる効率的なアーキテクチャです。現在は12名で分担して業務を行っています。これらはすべて Hetzner でホストされています。
各ワーカーの最大リソース消費量は 1 vCPU と 2 GB の RAM です。概要は次のとおりです:
Windmill は、リンティング、自動フォーマット、AI アシスタント、さらには 共同編集 を備えたブラウザ内 IDE のようなエディター エクスペリエンスを提供します。 🎜> (最後の機能は有料機能です)。ただし、最も優れているのはこのボタンです:
スクリプトをデプロイする前に、スクリプトを迅速に繰り返してテストすることができます。私は通常、ブラウザーでファイルを編集してテストし、完了したら git にプッシュします。
最適なコーディング環境に欠けている唯一のものは、デバッグ ツール (ブレークポイントと変数コンテキスト) です。現在、この弱点を克服するためにローカル IDE でスクリプトをデバッグしています。
私もです!
現在、GoodWatch には約 100 GB の永続データ ストレージが必要です:
毎日 6,500 のフロー が Windmill のオーケストレーション エンジンを介して実行されます。これにより、1 日あたりの量は次のようになります:
レート制限ポリシーが異なるため、これらの数値は根本的に異なります。
1 日に 1 回、データがクリーンアップされ、最終的なデータ形式に結合されます。現在、GoodWatch Web アプリ ストアを強化するデータベース:
映画をジャンルでしか区別できないと想像してみてください。非常に制限されていますよね?
それが私が DNA プロジェクトを始めた理由です。 ムード、プロット要素、キャラクタータイプ、ダイアログ、または キープロップなどの他の属性によって映画や番組を分類できます。 .
全アイテムの DNA 値のトップ 10 は次のとおりです:
これにより次の 2 つのことが可能になります:
例:
将来、DNA についてさらに多くの詳細を記載した専用のブログ投稿が作成される予定です。
データ パイプラインがどのように機能するかを完全に理解するために、各データ ソースで何が起こるかを以下に詳しく説明します。
データ ソースごとに、必要なすべてのデータを含む MongoDB コレクションを準備する ìnit フローがあります。 IMDb の場合、それは単なる imdb_id です。 Rotten Tomatoes の場合、タイトルとリリース年は必須です。これは、ID が不明であり、名前に基づいて正しい URL を推測する必要があるためです。
上で説明した優先順位の選択に基づいて、準備されたコレクション内の項目が、フェッチされたデータで更新されます。各データ ソースには独自のコレクションがあり、時間の経過とともにますます完成していきます。
映画用のフロー、テレビ番組用のフロー、ストリーミング リンク用のフローがあります。これらは、さまざまなコレクションから必要なデータをすべて収集し、それぞれの Postgres テーブルに保存し、Web アプリケーションによってクエリされます。
これはコピームービーのフローとスクリプトの抜粋です:
これらのフローの中には、実行に長い時間がかかるもの、場合によっては 6 時間以上かかるものもあります。これは、データ セット全体をバッチ処理するのではなく、更新されたすべての項目にフラグを付け、それらのみをコピーすることで最適化できます。私のリストにある多くの TODO 項目のうちの 1 つ ?
スケジューリングは、自動的に実行する必要がある各フローまたはスクリプトの cron 式を定義するのと同じくらい簡単です。
ここでは、GoodWatch に定義されているすべてのスケジュールの抜粋を示します:
合計で約 50 のスケジュールが定義されています。
優れたデータには大きな責任が伴います。多くのことがうまくいかない可能性があります。そしてそれは実現しました。
私のスクリプトの初期バージョンでは、コレクションまたはテーブル内のすべてのエントリを更新するのに時間がかかりました。それは、すべての項目を個別に更新挿入したためです。これにより、多くのオーバーヘッドが発生し、プロセスが大幅に遅くなります。
より良いアプローチは、更新/挿入するデータを収集し、データベース クエリをバッチ処理することです。以下は MongoDB の例です:
def main(): return tmdb_extract_daily_dump_data() def tmdb_extract_daily_dump_data(): print("Checking TMDB for latest daily dumps") init_mongodb() daily_dump_infos = get_daily_dump_infos() for daily_dump_info in daily_dump_infos: download_zip_and_store_in_db(daily_dump_info) close_mongodb() return [info.to_mongo() for info in daily_dump_infos] [...]
バッチ処理であっても、一部のスクリプトはワーカーがクラッシュするほど大量のメモリを消費しました。解決策は、ユースケースごとにバッチ サイズを慎重に微調整することでした。
一部のバッチは 5000 ステップで実行できますが、他のバッチはメモリにはるかに多くのデータを保存し、500 ステップでより適切に実行されます。
Windmill には、スクリプトの実行中にメモリを観察する優れた機能があります。
Windmill は、タスクを自動化するための開発者のツールキットに含まれる優れた資産です。これは私にとって非常に貴重な生産性の向上であり、タスク オーケストレーション、エラー処理、再試行、キャッシュといった重労働をアウトソーシングしながら、フロー構造とビジネス ロジックに集中できるようになりました。
大量のデータの処理は依然として困難であり、パイプラインの最適化は進行中のプロセスです。しかし、これまでのところすべてがうまくいっていることに本当に満足しています。
そう思いました。いくつかのリソースをリンクさせてください。これで完了です:
GoodWatch はオープンソースであることをご存知ですか?このリポジトリですべてのスクリプトとフロー定義を確認できます: https://github.com/alp82/goodwatch-monorepo/tree/main/goodwatch-flows/windmill/f
ご質問がございましたら、お知らせください。
以上が無数の映画と数百万のストリーミング リンクのデータ パイプラインの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。