ホームページ > バックエンド開発 > Python チュートリアル > 無数の映画と数百万のストリーミング リンクのデータ パイプライン

無数の映画と数百万のストリーミング リンクのデータ パイプライン

Patricia Arquette
リリース: 2024-12-27 15:02:10
オリジナル
986 人が閲覧しました

2023 年 2 月: 映画、テレビ番組のすべてのスコアとそれらのストリーミング先を 1 ページで確認したいと考えていましたが、自分に関連するすべてのソースを含むアグリゲーターが見つかりませんでした。

2023 年 3 月: そこで、その場でスコアを取得する MVP を構築し、サイトをオンラインにしました。動作しましたが、遅かったです (スコアの表示に 10 秒かかりました)。

2023 年 10 月: データを自分側に保存することが必要であることに気づき、windmill.dev を発見しました。少なくとも私のニーズに関しては、同様のオーケストレーション エンジンを簡単に上回ります。


今日は、12 か月間継続的にデータを読み込んだ後、パイプラインがどのように動作するかを詳しく共有したいと思います。さまざまなソースからデータを取得し、データを正規化し、クエリ用に最適化された形式に結合する複雑なシステムを構築する方法を学びます。

写真か起こらなかったか!

A Data Pipeline for illion movies and million streaming links

これは実行ビューです。すべてのドットはフローの実行を表します。フローは何でも構いません。たとえば、単純なワンステップ スクリプトです。

A Data Pipeline for illion movies and million streaming links

中央のブロックには次のようなスクリプトが含まれています (簡略化):

def main():
    return tmdb_extract_daily_dump_data()

def tmdb_extract_daily_dump_data():
    print("Checking TMDB for latest daily dumps")
    init_mongodb()

    daily_dump_infos = get_daily_dump_infos()
    for daily_dump_info in daily_dump_infos:
        download_zip_and_store_in_db(daily_dump_info)

    close_mongodb()
    return [info.to_mongo() for info in daily_dump_infos]

[...]
ログイン後にコピー
ログイン後にコピー

次の beast もフローです (これは緑色の点の 1 つにすぎないことに注意してください):

A Data Pipeline for illion movies and million streaming links

(高解像度画像: https://i.imgur.com/LGhTUGG.png)

これを詳しく見てみましょう:

  1. 次に優先される映画またはテレビ番組を入手します (次のセクションを参照)
  2. TMDB から最新データを取得します
  3. IMDbMetacriticRotten Tomatoes をスクレイピングして現在のスコアを取得します
  4. テレビの比喩を集めて...比喩を探します
  5. Huggingface API DNA データを収集します (以下で説明します)
  6. DNA データの高次元 ベクトルを保存
  7. 映画、番組、ストリーミング リンクのリレーショナル データを保存

これらの各手順は多かれ少なかれ複雑で、非同期プロセスの使用が含まれます。

どこから始めますか?優先キュー

次にどのタイトルを選択するかを決定するために、並行して処理される 2 つのレーンがあります。ここも風車が輝くエリアです。並列化とオーケストレーションは、そのアーキテクチャで完璧に機能します。

次のアイテムを選択するための 2 つのレーンは次のとおりです:

レーン 1: 各データ ソースの個別のフロー

まず、データが添付されていないタイトルがデータ ソースごとに選択されます。つまり、Metacritic パイプラインにまだスクレイピングされていないムービーがある場合、それが次に選択されます。これにより、新しいタイトルを含め、すべてのタイトルが少なくとも 1 回処理されるようになります。

すべてのタイトルにデータが添付されると、パイプラインは最も新しいデータを持つタイトルを選択します

これはそのようなフロー実行の例です。ここではレート制限に達したためエラーが発生しています:

A Data Pipeline for illion movies and million streaming links

Windmill を使用すると、フロー内の各ステップの再試行を簡単に定義できます。この場合、エラーが発生した場合に 3 回再試行するというロジックになります。レート制限に達しない限り (通常は別のステータス コードまたはエラー メッセージ)、直ちに停止します。

レーン 2: 各映画/番組ごとの優先フロー

上記は機能しますが、深刻な問題があります。最近のリリースは適切なタイミングで更新されていません。すべてのデータ側面が正常に取得されるまでには、数週間、場合によっては数か月かかる場合があります。たとえば、映画に最近の IMDb スコアが含まれているにもかかわらず、他のスコアが古く、ストリーミング リンクが完全に欠落している場合があります。特にスコアとストリーミングの可用性については、さらに高い精度を達成したいと考えていました。

この問題を解決するために、2 番目のレーンは別の優先順位付け戦略に焦点を当てています。最も人気のあるトレンドの映画/番組が選択され、すべてのデータ ソースにわたって完全なデータ更新が行われます。 このフローは以前に示しました。それは私が以前にと呼んだものです。

アプリでより頻繁に表示されるタイトルの優先順位も上がります。つまり、映画や番組が検索結果の上位に表示されるたび、または詳細ビューが開かれるたびに、すぐに更新される可能性があります。

すべてのタイトルは、優先レーンを使用して 週に 1 回のみ更新できます。これにより、その間に変更されていない可能性が高いデータが取得されないようにすることができます。

こんなことしてもいいですか?スクレイピングに関する考慮事項

「スクレイピングは合法ですか?」と疑問に思うかもしれません。データを取得する行為は通常は問題ありません。ただし、データをどう扱うかは慎重に検討する必要があります。 スクレイピングされたデータを使用するサービスから利益を得た時点で、その利用規約に違反している可能性があります。 (Web スクレイピングの法的状況と「スクレイピング」は単なる自動化されたアクセスであり、誰もがそれを行っているを参照) )

スクレイピングと関連法は新しいものであり、テストされていないことが多く、法的なグレーゾーンがたくさんあります。私は、サービスへの影響を最小限に抑えるために、すべての情報源を適宜引用し、レート制限を尊重し、不必要なリクエストを避けることを決意しています。

実際のところ、データは利益を得るために使用されるものではありません。 GoodWatch は誰でも永久に無料で使用できます。

さらに仕事をしますか?はい、ミロードさん

Windmill はワーカーを使用して、コードの実行を複数のプロセスに分散します。 フローの各ステップはワーカーに送信されるため、実際のビジネス ロジックから独立します。 メイン アプリのみがジョブを調整し、ワーカーは入力データ、実行するコード、結果を受け取るだけです。

これは、適切に拡張できる効率的なアーキテクチャです。現在は12名で分担して業務を行っています。これらはすべて Hetzner でホストされています。

各ワーカーの最大リソース消費量は 1 vCPU と 2 GB の RAM です。概要は次のとおりです:

A Data Pipeline for illion movies and million streaming links

風車編集者

Windmill は、リンティング自動フォーマットAI アシスタント、さらには 共同編集 を備えたブラウザ内 IDE のようなエディター エクスペリエンスを提供します。 🎜> (最後の機能は有料機能です)。ただし、最も優れているのはこのボタンです:

A Data Pipeline for illion movies and million streaming links

スクリプトをデプロイする前に、スクリプトを迅速に繰り返してテストすることができます。私は通常、ブラウザーでファイルを編集してテストし、完了したら git にプッシュします。

最適なコーディング環境に欠けている唯一のものは、デバッグ ツール (ブレークポイントと変数コンテキスト) です。現在、この弱点を克服するためにローカル IDE でスクリプトをデバッグしています。

数字。ナンバーズが好きです

私もです!

現在、GoodWatch には約 100 GB の永続データ ストレージが必要です:

  • 生の前処理データ (MongoDB) 用に 15 GB
  • 処理されたリレーショナル データ (Postgres) 用に 23 GB
  • 67 GB ベクトル データ (Postgres)

毎日 6,500 のフロー が Windmill のオーケストレーション エンジンを介して実行されます。これにより、1 日あたりの量は次のようになります:

  • 30,000 IMDb ページ
  • 9,000 テレビの比喩ページ
  • 5,000 ロッテン トマト ページ
  • 1,500 ハグフェイスプロンプト
  • 600 メタクリティック ページ

レート制限ポリシーが異なるため、これらの数値は根本的に異なります。

1 日に 1 回、データがクリーンアップされ、最終的なデータ形式に結合されます。現在、GoodWatch Web アプリ ストアを強化するデータベース:

  • 1,000 万 ストリーミング リンク
  • 100 万本 映画
  • 300,000 DNA 値
  • 200,000 テレビ番組
  • DNA が含まれる 70,000 映画/番組

あなたがいつも話しているその DNA とは何ですか?

映画をジャンルでしか区別できないと想像してみてください。非常に制限されていますよね?

それが私が DNA プロジェクトを始めた理由です。 ムードプロット要素キャラクタータイプダイアログ、または キープロップなどの他の属性によって映画や番組を分類できます。 .

全アイテムの DNA 値のトップ 10 は次のとおりです:

A Data Pipeline for illion movies and million streaming links

これにより次の 2 つのことが可能になります:

  1. DNA 値によるフィルター (リレーショナル データを使用)
  2. 類似度による検索(ベクトルデータを使用)

例:

  • メランコリックな気分
  • デューンと同様のストーリー: パート 2

将来、DNA についてさらに多くの詳細を記載した専用のブログ投稿が作成される予定です。

データパイプラインをさらに深く掘り下げる

データ パイプラインがどのように機能するかを完全に理解するために、各データ ソースで何が起こるかを以下に詳しく説明します。

1. 1 日に 1 回、MongoDB コレクションがすべての必要な入力データで更新されます。

データ ソースごとに、必要なすべてのデータを含む MongoDB コレクションを準備する ìnit フローがあります。 IMDb の場合、それは単なる imdb_id です。 Rotten Tomatoes の場合、タイトルとリリース年は必須です。これは、ID が不明であり、名前に基づいて正しい URL を推測する必要があるためです。

2. 継続的にデータをフェッチし、MongoDB コレクションに書き込みます。

上で説明した優先順位の選択に基づいて、準備されたコレクション内の項目が、フェッチされたデータで更新されます。各データ ソースには独自のコレクションがあり、時間の経過とともにますます完成していきます。

3. 1 日に 1 回、さまざまなフローが MongoDB コレクションからデータを収集し、Postgres に書き込みます。

映画用のフロー、テレビ番組用のフロー、ストリーミング リンク用のフローがあります。これらは、さまざまなコレクションから必要なデータをすべて収集し、それぞれの Postgres テーブルに保存し、Web アプリケーションによってクエリされます。

これはコピームービーのフローとスクリプトの抜粋です:

A Data Pipeline for illion movies and million streaming links

これらのフローの中には、実行に長い時間がかかるもの、場合によっては 6 時間以上かかるものもあります。これは、データ セット全体をバッチ処理するのではなく、更新されたすべての項目にフラグを付け、それらのみをコピーすることで最適化できます。私のリストにある多くの TODO 項目のうちの 1 つ ?

スケジュール設定

スケジューリングは、自動的に実行する必要がある各フローまたはスクリプトの cron 式を定義するのと同じくらい簡単です。

A Data Pipeline for illion movies and million streaming links

ここでは、GoodWatch に定義されているすべてのスケジュールの抜粋を示します:

A Data Pipeline for illion movies and million streaming links

合計で約 50 のスケジュールが定義されています。

課題

優れたデータには大きな責任が伴います。多くのことがうまくいかない可能性があります。そしてそれは実現しました。

処理が非常に遅い

私のスクリプトの初期バージョンでは、コレクションまたはテーブル内のすべてのエントリを更新するのに時間がかかりました。それは、すべての項目を個別に更新挿入したためです。これにより、多くのオーバーヘッドが発生し、プロセスが大幅に遅くなります。

より良いアプローチは、更新/挿入するデータを収集し、データベース クエリをバッチ処理することです。以下は MongoDB の例です:

def main():
    return tmdb_extract_daily_dump_data()

def tmdb_extract_daily_dump_data():
    print("Checking TMDB for latest daily dumps")
    init_mongodb()

    daily_dump_infos = get_daily_dump_infos()
    for daily_dump_info in daily_dump_infos:
        download_zip_and_store_in_db(daily_dump_info)

    close_mongodb()
    return [info.to_mongo() for info in daily_dump_infos]

[...]
ログイン後にコピー
ログイン後にコピー

メモリを大量に消費するスクリプト

バッチ処理であっても、一部のスクリプトはワーカーがクラッシュするほど大量のメモリを消費しました。解決策は、ユースケースごとにバッチ サイズを慎重に微調整することでした。

一部のバッチは 5000 ステップで実行できますが、他のバッチはメモリにはるかに多くのデータを保存し、500 ステップでより適切に実行されます。

Windmill には、スクリプトの実行中にメモリを観察する優れた機能があります。

A Data Pipeline for illion movies and million streaming links

重要なポイント

Windmill は、タスクを自動化するための開発者のツールキットに含まれる優れた資産です。これは私にとって非常に貴重な生産性の向上であり、タスク オーケストレーション、エラー処理、再試行、キャッシュといった重労働をアウトソーシングしながら、フロー構造とビジネス ロジックに集中できるようになりました。

大量のデータの処理は依然として困難であり、パイプラインの最適化は進行中のプロセスです。しかし、これまでのところすべてがうまくいっていることに本当に満足しています。

分かった、分かった。それで十分です

そう思いました。いくつかのリソースをリンクさせてください。これで完了です:

  • グッドウォッチ
  • GoodWatch Discord コミュニティ
  • 風車
  • Windmill Discord コミュニティ

GoodWatch はオープンソースであることをご存知ですか?このリポジトリですべてのスクリプトとフロー定義を確認できます: https://github.com/alp82/goodwatch-monorepo/tree/main/goodwatch-flows/windmill/f

ご質問がございましたら、お知らせください。

以上が無数の映画と数百万のストリーミング リンクのデータ パイプラインの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:dev.to
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
著者別の最新記事
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート