Heim > Backend-Entwicklung > Python-Tutorial > Echtzeit-Flugverkehrsdatenanalyse mit Spark Structured Streaming und Apache Kafka

Echtzeit-Flugverkehrsdatenanalyse mit Spark Structured Streaming und Apache Kafka

Mary-Kate Olsen
Freigeben: 2024-10-29 21:55:02
Original
992 Leute haben es durchsucht

Derzeit leben wir in einer Welt, in der jede Sekunde Petabytes an Daten generiert werden. Daher wird die Analyse und Verarbeitung dieser Daten in Echtzeit für ein Unternehmen, das Geschäftseinblicke präziser generieren möchte, während immer mehr Daten produziert werden, mehr als unerlässlich.

Heute werden wir eine Echtzeit-Datenanalyse basierend auf fiktiven Flugverkehrsdaten mithilfe von Spark Structured Streaming und Apache Kafka entwickeln. Wenn Sie nicht wissen, was diese Technologien sind, empfehle ich Ihnen, meinen Artikel zu lesen, in dem ich sie ausführlicher vorstelle, sowie andere Konzepte, die in diesem Artikel behandelt werden. Vergessen Sie also nicht, es sich anzusehen?.

Sie können sich das komplette Projekt auf meinem GitHub ansehen.

Architektur

Stellen Sie sich vor, Sie, ein Dateningenieur, arbeiten bei einer Fluggesellschaft namens SkyX, wo jede Sekunde Daten über den Flugverkehr generiert werden.

Sie wurden gebeten, ein Dashboard zu entwickeln, das Echtzeitdaten dieser Flüge anzeigt, beispielsweise eine Rangliste der am häufigsten besuchten Städte im Ausland; die Städte, die die meisten Menschen verlassen; und die Flugzeuge, die die meisten Menschen um die Welt transportieren.

Dies sind die Daten, die bei jedem Flug generiert werden:

  • aircraft_name: Name des Flugzeugs. Bei SkyX sind nur fünf Flugzeuge verfügbar.
  • Von: Stadt, von der das Flugzeug abfliegt. SkyX führt nur Flüge zwischen fünf Städten auf der ganzen Welt durch.
  • An: Zielstadt des Flugzeugs. Wie bereits erwähnt, führt SkyX nur Flüge zwischen fünf Städten auf der ganzen Welt durch.
  • Passagiere: Anzahl der Passagiere, die das Flugzeug befördert. Alle SkyX-Flugzeuge befördern auf jedem Flug zwischen 50 und 100 Personen.

Es folgt die Grundarchitektur unseres Projekts:

  • Produzent: verantwortlich für die Erstellung von Flugzeug-Flugverkehrsdaten und deren Übermittlung an ein Apache Kafka-Thema.
  • Verbraucher: Beobachtet nur die Daten, die in Echtzeit zum Apache Kafka-Thema eingehen.
  • Datenanalyse: drei Dashboards, die die Daten, die beim Apache Kafka-Thema ankommen, in Echtzeit verarbeiten und analysieren. Analyse der Städte, die die meisten Touristen empfangen; Analyse der Städte, die die meisten Menschen verlassen, um andere Städte zu besuchen; und Analyse der SkyX-Flugzeuge, die die meisten Menschen zwischen Städten auf der ganzen Welt transportieren.

Vorbereiten der Entwicklungsumgebung

In diesem Tutorial wird davon ausgegangen, dass Sie PySpark bereits auf Ihrem Computer installiert haben. Wenn Sie es noch nicht getan haben, sehen Sie sich die Schritte in der Dokumentation selbst an.

Was Apache Kafka betrifft, werden wir es durch Containerisierung über Docker nutzen??.

Und schließlich werden wir Python über eine virtuelle Umgebung verwenden.

Apache Kafka durch Containerisierung über Docker

Erstellen Sie ohne weiteres einen Ordner mit dem Namen skyx und fügen Sie darin die Datei docker-compose.yml hinzu.

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Fügen Sie nun den folgenden Inhalt in die Docker-Compose-Datei ein:

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Fertig! Wir können jetzt unseren Kafka-Server hochladen. Geben Sie dazu den folgenden Befehl in das Terminal ein:

$ docker compose up -d
$ docker compose ps
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Nach dem Login kopieren
Nach dem Login kopieren

Hinweis: Dieses Tutorial verwendet Version 2.0 von Docker Compose. Aus diesem Grund gibt es kein „-“ zwischen docker und compose ☺.

Jetzt müssen wir in Kafka ein Thema erstellen, das die vom Produzenten in Echtzeit gesendeten Daten speichert. Dazu greifen wir im Container auf Kafka zu:

$ docker compose exec kafka bash

Und schließlich erstellen Sie das Thema mit dem Namen Flugverkehr.

$ kafka-topics --create --topic airtraffic --bootstrap-server localhost:29092

Thema Flugverkehr erstellt.

Erstellung der virtuellen Umgebung

Um unseren Produzenten zu entwickeln, also die Anwendung, die für das Senden von Echtzeit-Flugverkehrsdaten an das Kafka-Thema verantwortlich ist, müssen wir die Kafka-Python-Bibliothek verwenden. kafka-python ist eine von der Community entwickelte Bibliothek, die es uns ermöglicht, Produzenten und Konsumenten zu entwickeln, die sich in Apache Kafka integrieren lassen.

Erstellen wir zunächst eine Datei namens requirements.txt und fügen darin die folgende Abhängigkeit hinzu:

Kafka-Python

Zweitens erstellen wir eine virtuelle Umgebung und installieren die Abhängigkeiten in der Datei „requirements.txt“:

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Fertig! Jetzt ist unsere Umgebung bereit für die Entwicklung?.

Produzentenentwicklung

Jetzt erstellen wir unseren Produzenten. Wie bereits erwähnt, wird der Produzent dafür verantwortlich sein, Flugverkehrsdaten an das neu erstellte Kafka-Topic zu senden.

Wie auch in der Architektur gesagt wurde, fliegt SkyX nur zwischen fünf Städten auf der ganzen Welt und verfügt nur über fünf Flugzeuge? Erwähnenswert ist, dass jedes Flugzeug zwischen 50 und 100 Personen befördert.

Beachten Sie, dass die Daten zufällig generiert und im JSON-Format in einem Zeitintervall zwischen 1 und 6 Sekunden an das Thema gesendet werden?.

Los geht's! Erstellen Sie ein Unterverzeichnis mit dem Namen src und ein weiteres Unterverzeichnis mit dem Namen kafka. Erstellen Sie im Kafka-Verzeichnis eine Datei mit dem Namen airtraffic_producer.py und fügen Sie darin den folgenden Code hinzu:

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Fertig! Wir entwickeln unseren Produzenten. Führen Sie es aus und lassen Sie es eine Weile laufen.

$ python airtraffic_producer.py

Verbraucherentwicklung

Jetzt entwickeln wir unseren Verbraucher. Dies wird eine sehr einfache Anwendung sein. Es werden lediglich die im Kafka-Topic eingehenden Daten in Echtzeit im Terminal angezeigt.

Erstellen Sie noch im Kafka-Verzeichnis eine Datei mit dem Namen airtraffic_consumer.py und fügen Sie darin den folgenden Code hinzu:

$ docker compose up -d
$ docker compose ps
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Sehen Sie, ich habe Ihnen gesagt, dass es sehr einfach ist. Führen Sie es aus und beobachten Sie die Daten, die in Echtzeit angezeigt werden, während der Produzent Daten an das Thema sendet.

$ python airtraffic_consumer.py

Datenanalyse: Städte, die die meisten Touristen empfangen

Jetzt beginnen wir mit unserer Datenanalyse. An dieser Stelle werden wir ein Dashboard entwickeln, eine Anwendung, die in Echtzeit eine Rangliste der Städte anzeigt, die die meisten Touristen empfangen. Mit anderen Worten: Wir gruppieren die Daten nach der Spalte bis und bilden eine Summe basierend auf der Spalte Passagiere. Ganz einfach!

Erstellen Sie dazu im src-Verzeichnis ein Unterverzeichnis mit dem Namen dashboards und eine Datei mit dem Namen tourists_analysis.py. Fügen Sie dann den folgenden Code hinzu:

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Und wir können unsere Datei jetzt über Spark-Submit ausführen. Aber beruhige dich! Wenn wir PySpark mit Kafka integrieren, müssen wir spark-submit anders ausführen. Es ist notwendig, das Apache Kafka-Paket und die aktuelle Version von Apache Spark über den Parameter --packages.

zu informieren

Wenn Sie Apache Spark zum ersten Mal mit Apache Kafka integrieren, kann die Ausführung von spark-submit eine Weile dauern. Dies liegt daran, dass die erforderlichen Pakete heruntergeladen werden müssen.

Stellen Sie sicher, dass der Producer noch läuft, damit wir die Datenanalyse in Echtzeit sehen können. Führen Sie im Dashboards-Verzeichnis den folgenden Befehl aus:

$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0touristen_analysis.py

version: '3.9'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Datenanalyse: Städte, die die meisten Menschen verlassen

Diese Analyse ist der vorherigen sehr ähnlich. Anstatt jedoch in Echtzeit die Städte zu analysieren, die die meisten Touristen empfangen, werden wir die Städte analysieren, in denen die meisten Menschen abreisen. Erstellen Sie dazu eine Datei mit dem Namen leavers_analysis.py und fügen Sie darin den folgenden Code hinzu:

$ docker compose up -d
$ docker compose ps
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Stellen Sie sicher, dass der Producer noch läuft, damit wir die Datenanalyse in Echtzeit sehen können. Führen Sie im Dashboards-Verzeichnis den folgenden Befehl aus:

$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 Leavers_analysis.py

NAME                                COMMAND                  SERVICE             STATUS              PORTS
skyx-kafka-1       "/etc/confluent/dock…"   kafka               running             9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1   "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Nach dem Login kopieren
Nach dem Login kopieren

Datenanalyse: Flugzeuge, die die meisten Passagiere befördern

Diese Analyse ist viel einfacher als die vorherigen. Lassen Sie uns in Echtzeit analysieren, welche Flugzeuge die meisten Passagiere zwischen Städten auf der ganzen Welt befördern. Erstellen Sie eine Datei mit dem Namen aircrafts_analysis.py und fügen Sie darin den folgenden Code hinzu:

$ python -m venv venv
$ venv\scripts\activate
$ pip install -r requirements.txt
Nach dem Login kopieren

Stellen Sie sicher, dass der Producer noch läuft, damit wir die Datenanalyse in Echtzeit sehen können. Führen Sie im Dashboards-Verzeichnis den folgenden Befehl aus:

$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 Aircrafts_analysis.py

$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Abschließende Überlegungen

Und wir sind hier fertig, Leute! In diesem Artikel entwickeln wir eine Echtzeit-Datenanalyse basierend auf fiktiven Flugverkehrsdaten mit Spark Structured Streaming und Apache Kafka.

Dazu haben wir einen Produzenten entwickelt, der diese Daten in Echtzeit an das Kafka-Thema sendet, und dann haben wir 3 Dashboards entwickelt, um diese Daten in Echtzeit zu analysieren.

Ich hoffe, es hat dir gefallen. Bis zum nächsten Mal?.

Das obige ist der detaillierte Inhalt vonEchtzeit-Flugverkehrsdatenanalyse mit Spark Structured Streaming und Apache Kafka. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:dev.to
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Neueste Artikel des Autors
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage