首頁 > 後端開發 > Python教學 > 使用 Spark Structured Streaming 和 Apache Kafka 進行即時空中交通資料分析

使用 Spark Structured Streaming 和 Apache Kafka 進行即時空中交通資料分析

Mary-Kate Olsen
發布: 2024-10-29 21:55:02
原創
947 人瀏覽過

目前,我們生活在一個每秒產生千萬億位元組資料的世界。因此,對於希望隨著數據的產生而更準確地產生業務洞察的公司來說,即時分析和處理這些數據變得非常重要。

今天,我們將使用 Spark Structured Streaming 和 Apache Kafka 開發基於虛構空中交通數據的即時數據分析。如果您不知道這些技術是什麼,我建議您閱讀我撰寫的文章,更詳細地介紹它們,以及本文將涵蓋的其他概念。所以,別忘了檢查一下嗎? .

你可以在我的 GitHub 上查看完整的專案。

建築學

好吧,想像一下,您是一名數據工程師,在一家名為 SkyX 的航空公司工作,該航空公司每秒都會產生有關空中交通的數據。

您被要求開發一個儀表板來顯示這些航班的即時數據,例如國外訪問量最大的城市的排名;大多數人離開的城市;以及在世界各地運送最多乘客的飛機。

這是每次航班產生的數據:

  • aircraft_name:飛機的名稱。在 SkyX,只有五架飛機可用。
  • 出發地:飛機起飛的城市。 SkyX 只營運全球五個城市之間的航班​​。
  • 目的地:飛機目的地城市。如前所述,SkyX 僅運營全球五個城市之間的航班​​。
  • 乘客:飛機運送的乘客數量。所有 SkyX 飛機每次飛行可搭載 50 至 100 人。

以下是我們專案的基本架構:

  • 生產者:負責產生飛機空中交通資料並將其發送到 Apache Kafka 主題。
  • Consumer:只觀察即時到達Apache Kafka Topic的資料。
  • 資料分析:三個儀表板,即時處理和分析到達 Apache Kafka 主題的資料。分析接待遊客最多的城市;分析大多數人離開前往其他城市的城市;以及在世界各地城市之間運送最多乘客的 SkyX 飛機的分析。

準備開發環境

本教學假設您的電腦上已經安裝了 PySpark。如果您還沒有這樣做,請查看文件本身中的步驟。

至於Apache Kafka,我們將透過Docker容器化來使用它??.

最後,我們將透過虛擬環境使用 Python。

透過 Docker 進行容器化的 Apache Kafka

話不多說,建立一個名為 skyx 的資料夾,並在其中加入檔案 docker-compose.yml

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml
登入後複製
登入後複製
登入後複製
登入後複製

現在,在 docker-compose 檔案中加入以下內容:

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
登入後複製
登入後複製
登入後複製

完成!我們現在可以上傳我們的 Kafka 伺服器。為此,請在終端機中鍵入以下命令:

$ docker compose up -d
$ docker compose ps
登入後複製
登入後複製
登入後複製
NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
登入後複製
登入後複製

注意:本教學使用 Docker Compose 2.0 版本。這就是為什麼 dockercompose ☺ 之間沒有「-」。

現在,我們需要在 Kafka 中建立一個主題來儲存生產者即時發送的資料。為此,讓我們訪問容器內的 Kafka:

$ docker compose 執行 kafka bash

最後建立主題,名稱為airtraffic

$ kafka-topics --create --topic Airtraffic --bootstrap-server localhost:29092

創建了主題airtraffic。

虛擬環境的創建

為了開發我們的生產者,即負責將即時空中交通數據發送到 Kafka 主題的應用程序,我們需要使用 kafka-python 庫。 kafka-python 是一個社群開發的函式庫,可讓我們開發與 Apache Kafka 整合的生產者和消費者。

首先,讓我們建立一個名為 requirements.txt 的文件,並在其中新增以下相依性:

卡夫卡-python

其次,我們將建立一個虛擬環境並在requirements.txt檔案中安裝依賴項:

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml
登入後複製
登入後複製
登入後複製
登入後複製

完成!現在我們的環境已經準備好開發了嗎? .

生產者發展

現在讓我們創建我們的生產者。如前所述,生產者將負責將空中交通數據發送到新創建的 Kafka 主題。

如架構所說,SkyX 只在全球五個城市之間飛行,並且只有五架飛機? 。值得一提的是,每架飛機可搭載50至100人。

注意,資料是隨機產生的,以json格式傳送到主題,時間間隔在1到6秒之間? .

我們走吧!建立一個名為 src 的子目錄和另一個名為 kafka 的子目錄。在 kafka 目錄中,建立一個名為 airtraffic_ Producer.py 的文件,並在其中新增以下程式碼:

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
登入後複製
登入後複製
登入後複製

完成!我們發展我們的生產者。運行它並讓它運行一段時間。

$ python Airtraffic_ Producer.py

消費者發展

現在讓我們開發我們的消費者。這將是一個非常簡單的應用程式。它只是在終端機中即時顯示到達kafka topic的資料。

仍在 kafka 目錄中,建立一個名為 airtraffic_consumer.py 的文件,並在其中添加以下程式碼:

$ docker compose up -d
$ docker compose ps
登入後複製
登入後複製
登入後複製

看,我告訴你這很簡單。運行它並觀察生產者向主題發送資料時即時顯示的資料。

$ python airtraffic_consumer.py

數據分析:接待遊客最多的城市

現在我們開始進行數據分析。此時,我們將開發一個儀表板,一個應用程序,它將即時顯示接待遊客最多的城市的排名。換句話說,我們將按 to 列對資料進行分組,並根據 passengers 列進行求和。非常簡單!

為此,請在 src 目錄中建立一個名為 dashboards 的子目錄,並建立一個名為 tourists_analysis.py 的檔案。然後在其中加入以下程式碼:

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml
登入後複製
登入後複製
登入後複製
登入後複製

我們現在可以透過spark-submit執行我們的檔案。但冷靜點!當我們將 PySpark 與 Kafka 整合時,我們必須以不同的方式運行 Spark-Submit。需要透過 --packages.

參數告知 Apache Kafka 套件以及 Apache Spark 目前版本

如果這是您第一次將 Apache Spark 與 Apache Kafka 集成,spark-submit 可能需要一段時間才能運行。這是因為它需要下載必要的軟體包。

確保生產者仍在運行,以便我們可以即時看到數據分析。在儀表板目錄中,執行以下命令:

$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0遊客分析.py

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
登入後複製
登入後複製
登入後複製

數據分析:最多人離開的城市

這個分析與上一個非常相似。然而,我們不會即時分析接待遊客最多的城市,而是分析最多人離開的城市。為此,請建立一個名為 leavers_analysis.py 的文件,並在其中加入以下程式碼:

$ docker compose up -d
$ docker compose ps
登入後複製
登入後複製
登入後複製

確保生產者仍在運行,以便我們可以即時看到數據分析。在儀表板目錄中,執行以下命令:

$ Spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 Leavers_analysis.py

NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
登入後複製
登入後複製

數據分析:載客量最多的飛機

這個分析比前面的簡單多了。讓我們即時分析在世界各地城市之間運送最多乘客的飛機。建立一個名為 aircrafts_analysis.py 的文件,並在其中加入以下程式碼:

$ python -m venv venv
$ venv\scripts\activate
$ pip install -r requirements.txt
登入後複製

確保生產者仍在運行,以便我們可以即時看到數據分析。在儀表板目錄中,執行以下命令:

$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 heavens_analysis.py

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml
登入後複製
登入後複製
登入後複製
登入後複製

最後的考慮因素

我們就到此結束了,夥伴們!在本文中,我們使用 Spark Structured Streaming 和 Apache Kafka 開發基於虛構空中交通資料的即時資料分析。

為此,我們開發了一個生產者,將這些數據即時發送到 Kafka 主題,然後我們開發了 3 個儀表板來即時分析這些數據。

我希望你喜歡它。下次見? .

以上是使用 Spark Structured Streaming 和 Apache Kafka 進行即時空中交通資料分析的詳細內容。更多資訊請關注PHP中文網其他相關文章!

來源:dev.to
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
作者最新文章
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板