ホームページ > バックエンド開発 > Python チュートリアル > Spark Structured Streaming と Apache Kafka を使用したリアルタイムの航空交通データ分析

Spark Structured Streaming と Apache Kafka を使用したリアルタイムの航空交通データ分析

Mary-Kate Olsen
リリース: 2024-10-29 21:55:02
オリジナル
946 人が閲覧しました

現在、私たちは毎秒ペタバイトのデータが生成される世界に住んでいます。そのため、データが生成され、さらに多くのデータが生成されるにつれて、ビジネス上の洞察をより正確に生成したいと考えている企業にとって、このデータをリアルタイムで分析および処理することは、ますます重要になります。

今日は、Spark Structured Streaming と Apache Kafka を使用して、架空の航空交通データに基づくリアルタイム データ分析を開発します。これらのテクノロジーが何であるかわからない場合は、この記事で取り上げる他の概念と同様に、それらのテクノロジーをより詳しく紹介するために私が書いた記事を読むことをお勧めします。ぜひチェックしてみてください。

私の GitHub で完全なプロジェクトをチェックアウトできます。

建築

データ エンジニアであるあなたが、航空交通に関するデータが毎秒生成される SkyX という航空会社で働いていると想像してください。

あなたは、海外で最も訪問される都市のランキングなど、これらのフライトからのリアルタイム データを表示するダッシュボードを開発するように依頼されました。ほとんどの人が離れる都市。そして世界中で最も多くの人を輸送する航空機です。

これは各フライトで生成されるデータです:

  • aircraft_name: 航空機の名前。 SkyX では、利用可能な航空機は 5 機のみです。
  • 出発地: 航空機が出発する都市。 SkyX は、世界中の 5 都市間のフライトのみを運航しています。
  • 宛先: 航空機の目的地都市。前述したように、SkyX は世界中の 5 都市間のフライトのみを運航しています。
  • 乗客: 航空機が輸送している乗客の数。すべての SkyX 航空機は、各フライトで 50 人から 100 人を乗せます。

以下は私たちのプロジェクトの基本的なアーキテクチャです:

  • プロデューサー: 航空機の航空交通データを生成し、それを Apache Kafka トピックに送信する責任があります。
  • コンシューマ: Apache Kafka トピックにリアルタイムで到着するデータのみを観察します。
  • データ分析: Apache Kafka トピックに到着するデータをリアルタイムで処理および分析する 3 つのダッシュボード。最も多くの観光客を受け入れる都市の分析。ほとんどの人が他の都市を訪れるために出発する都市の分析。そして、世界中の都市間で最も多くの人を輸送する SkyX 航空機の分析。

開発環境の準備

このチュートリアルは、マシンに PySpark がすでにインストールされていることを前提としています。まだ確認していない場合は、ドキュメント内の手順を確認してください。

Apache Kafka については、Docker?? を介してコンテナ化して使用します。

そして最後に、仮想環境を通じて Python を使用します。

Docker 経由のコンテナ化による Apache Kafka

すぐに、skyx というフォルダーを作成し、その中にファイル docker-compose.yml を追加します。

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー

次に、次のコンテンツを docker-compose ファイル内に追加します。

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー

完了!これで、Kafka サーバーをアップロードできるようになりました。これを行うには、ターミナルに次のコマンドを入力します:

$ docker compose up -d
$ docker compose ps
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー
NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
ログイン後にコピー
ログイン後にコピー

注: このチュートリアルでは、Docker Compose のバージョン 2.0 を使用しています。 dockercompose の間に「-」がないのはこのためです ☺.

ここで、プロデューサーによってリアルタイムで送信されたデータを保存するトピックを Kafka 内に作成する必要があります。これを行うには、コンテナ内の Kafka にアクセスしましょう:

$ docker compose exec Kafka bash

そして最後に、airtraffic というトピックを作成します。

$ kafka-topics --create --topic airtraffic --bootstrap-server localhost:29092

航空交通に関するトピックを作成しました。

仮想環境の作成

プロデューサー、つまりリアルタイムの航空交通データを Kafka トピックに送信する役割を担うアプリケーションを開発するには、kafka-python ライブラリを使用する必要があります。 kafka-python は、Apache Kafka と統合するプロデューサーとコンシューマーの開発を可能にするコミュニティ開発のライブラリです。

まず、requirements.txt というファイルを作成し、その中に次の依存関係を追加しましょう。

カフカパイソン

2 番目に、仮想環境を作成し、requirements.txt ファイルに依存関係をインストールします。

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー

完了!これで環境は開発の準備が整いました?

プロデューサーの育成

それでは、プロデューサーを作成しましょう。前述したように、プロデューサーは、新しく作成された Kafka トピックに航空交通データを送信する責任を負います。

アーキテクチャでも言われていましたが、SkyX は世界中の 5 つの都市間を飛行するだけで、利用可能な航空機は 5 台しかありません。各航空機には 50 人から 100 人が搭乗できることは注目に値します。

データはランダムに生成され、1 ~ 6 秒の間隔で JSON 形式でトピックに送信されることに注意してください。

行きましょう! src というサブディレクトリと、kafka という別のサブディレクトリを作成します。 kafka ディレクトリ内に、airtraffic_Producer.py というファイルを作成し、その中に次のコードを追加します。

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー

完了!私たちはプロデューサーを育成します。実行して、しばらく放置します。

$ python airtraffic_Producer.py

消費者開発

それでは、コンシューマを開発しましょう。これは非常に単純なアプリケーションになります。 Kafka トピックに到着したデータをターミナルにリアルタイムで表示するだけです。

引き続き kafka ディレクトリ内に、airtraffic_consumer.py というファイルを作成し、その中に次のコードを追加します。

$ docker compose up -d
$ docker compose ps
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー

ほら、とても簡単なことだと言いましたね。これを実行し、プロデューサーがトピックにデータを送信するときにリアルタイムで表示されるデータを確認します。

$ python airtraffic_consumer.py

データ分析: 観光客が最も多い都市

ここからはデータ分析を始めます。この時点で、最も観光客が多い都市のランキングをリアルタイムに表示するダッシュボード、つまりアプリケーションを開発します。つまり、データを to 列でグループ化し、passengers 列に基づいて合計を作成します。とてもシンプルです!

これを行うには、src ディレクトリ内に dashboards というサブディレクトリを作成し、tourists_analysis.py というファイルを作成します。次に、その中に次のコードを追加します:

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー

これで、spark-submit を通じてファイルを実行できるようになりました。でも、落ち着いてください! PySpark を Kafka と統合する場合は、spark-submit を別の方法で実行する必要があります。 --packages.

パラメータを通じて、Apache Kafka パッケージと Apache Spark の現在のバージョンを通知する必要があります。

Apache Spark を Apache Kafka と統合するのが初めての場合、spark-submit の実行には時間がかかる場合があります。これは、必要なパッケージをダウンロードする必要があるためです。

データ分析をリアルタイムで確認できるように、プロデューサーがまだ実行されていることを確認してください。ダッシュボード ディレクトリ内で、次のコマンドを実行します:

$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 Tourism_analysis.py

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー

データ分析: ほとんどの人が離れる都市

この分析は前の分析と非常に似ています。ただし、最も多くの観光客を受け入れる都市をリアルタイムで分析するのではなく、最も多くの人が離れる都市を分析します。これを行うには、leavers_analysis.py というファイルを作成し、その中に次のコードを追加します。

$ docker compose up -d
$ docker compose ps
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー

データ分析をリアルタイムで確認できるように、プロデューサーがまだ実行されていることを確認してください。ダッシュボード ディレクトリ内で、次のコマンドを実行します:

$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 Leavers_analysis.py

NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
ログイン後にコピー
ログイン後にコピー

データ分析: 最も多くの乗客を運ぶ航空機

この分析は前の分析よりもはるかに単純です。世界中の都市間で最も多くの乗客を輸送している航空機をリアルタイムで分析してみましょう。 aircrafts_analysis.py というファイルを作成し、その中に次のコードを追加します:

$ python -m venv venv
$ venv\scripts\activate
$ pip install -r requirements.txt
ログイン後にコピー

データ分析をリアルタイムで確認できるように、プロデューサーがまだ実行されていることを確認してください。ダッシュボード ディレクトリ内で、次のコマンドを実行します:

$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0航空機_分析.py

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー

最終的な考慮事項

そして皆さん、ここで終わります!この記事では、Spark Structured Streaming と Apache Kafka を使用して、架空の航空交通データに基づくリアルタイム データ分析を開発します。

これを行うために、このデータをリアルタイムで Kafka トピックに送信するプロデューサーを開発し、その後、このデータをリアルタイムで分析するための 3 つのダッシュボードを開発しました。

気に入っていただければ幸いです。また次回お会いしましょう。

以上がSpark Structured Streaming と Apache Kafka を使用したリアルタイムの航空交通データ分析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:dev.to
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
著者別の最新記事
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート