Derzeit leben wir in einer Welt, in der jede Sekunde Petabytes an Daten generiert werden. Daher wird die Analyse und Verarbeitung dieser Daten in Echtzeit für ein Unternehmen, das Geschäftseinblicke präziser generieren möchte, während immer mehr Daten produziert werden, mehr als unerlässlich.
Heute werden wir eine Echtzeit-Datenanalyse basierend auf fiktiven Flugverkehrsdaten mithilfe von Spark Structured Streaming und Apache Kafka entwickeln. Wenn Sie nicht wissen, was diese Technologien sind, empfehle ich Ihnen, meinen Artikel zu lesen, in dem ich sie ausführlicher vorstelle, sowie andere Konzepte, die in diesem Artikel behandelt werden. Vergessen Sie also nicht, es sich anzusehen?.
Sie können sich das komplette Projekt auf meinem GitHub ansehen.
Stellen Sie sich vor, Sie, ein Dateningenieur, arbeiten bei einer Fluggesellschaft namens SkyX, wo jede Sekunde Daten über den Flugverkehr generiert werden.
Sie wurden gebeten, ein Dashboard zu entwickeln, das Echtzeitdaten dieser Flüge anzeigt, beispielsweise eine Rangliste der am häufigsten besuchten Städte im Ausland; die Städte, die die meisten Menschen verlassen; und die Flugzeuge, die die meisten Menschen um die Welt transportieren.
Dies sind die Daten, die bei jedem Flug generiert werden:
Es folgt die Grundarchitektur unseres Projekts:
In diesem Tutorial wird davon ausgegangen, dass Sie PySpark bereits auf Ihrem Computer installiert haben. Wenn Sie es noch nicht getan haben, sehen Sie sich die Schritte in der Dokumentation selbst an.
Was Apache Kafka betrifft, werden wir es durch Containerisierung über Docker nutzen??.
Und schließlich werden wir Python über eine virtuelle Umgebung verwenden.
Erstellen Sie ohne weiteres einen Ordner mit dem Namen skyx und fügen Sie darin die Datei docker-compose.yml hinzu.
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
Fügen Sie nun den folgenden Inhalt in die Docker-Compose-Datei ein:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Fertig! Wir können jetzt unseren Kafka-Server hochladen. Geben Sie dazu den folgenden Befehl in das Terminal ein:
$ docker compose up -d $ docker compose ps
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Hinweis: Dieses Tutorial verwendet Version 2.0 von Docker Compose. Aus diesem Grund gibt es kein „-“ zwischen docker und compose ☺.
Jetzt müssen wir in Kafka ein Thema erstellen, das die vom Produzenten in Echtzeit gesendeten Daten speichert. Dazu greifen wir im Container auf Kafka zu:
$ docker compose exec kafka bash
Und schließlich erstellen Sie das Thema mit dem Namen Flugverkehr.
$ kafka-topics --create --topic airtraffic --bootstrap-server localhost:29092
Thema Flugverkehr erstellt.
Um unseren Produzenten zu entwickeln, also die Anwendung, die für das Senden von Echtzeit-Flugverkehrsdaten an das Kafka-Thema verantwortlich ist, müssen wir die Kafka-Python-Bibliothek verwenden. kafka-python ist eine von der Community entwickelte Bibliothek, die es uns ermöglicht, Produzenten und Konsumenten zu entwickeln, die sich in Apache Kafka integrieren lassen.
Erstellen wir zunächst eine Datei namens requirements.txt und fügen darin die folgende Abhängigkeit hinzu:
Kafka-Python
Zweitens erstellen wir eine virtuelle Umgebung und installieren die Abhängigkeiten in der Datei „requirements.txt“:
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
Fertig! Jetzt ist unsere Umgebung bereit für die Entwicklung?.
Jetzt erstellen wir unseren Produzenten. Wie bereits erwähnt, wird der Produzent dafür verantwortlich sein, Flugverkehrsdaten an das neu erstellte Kafka-Topic zu senden.
Wie auch in der Architektur gesagt wurde, fliegt SkyX nur zwischen fünf Städten auf der ganzen Welt und verfügt nur über fünf Flugzeuge? Erwähnenswert ist, dass jedes Flugzeug zwischen 50 und 100 Personen befördert.
Beachten Sie, dass die Daten zufällig generiert und im JSON-Format in einem Zeitintervall zwischen 1 und 6 Sekunden an das Thema gesendet werden?.
Los geht's! Erstellen Sie ein Unterverzeichnis mit dem Namen src und ein weiteres Unterverzeichnis mit dem Namen kafka. Erstellen Sie im Kafka-Verzeichnis eine Datei mit dem Namen airtraffic_producer.py und fügen Sie darin den folgenden Code hinzu:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Fertig! Wir entwickeln unseren Produzenten. Führen Sie es aus und lassen Sie es eine Weile laufen.
$ python airtraffic_producer.py
Jetzt entwickeln wir unseren Verbraucher. Dies wird eine sehr einfache Anwendung sein. Es werden lediglich die im Kafka-Topic eingehenden Daten in Echtzeit im Terminal angezeigt.
Erstellen Sie noch im Kafka-Verzeichnis eine Datei mit dem Namen airtraffic_consumer.py und fügen Sie darin den folgenden Code hinzu:
$ docker compose up -d $ docker compose ps
Sehen Sie, ich habe Ihnen gesagt, dass es sehr einfach ist. Führen Sie es aus und beobachten Sie die Daten, die in Echtzeit angezeigt werden, während der Produzent Daten an das Thema sendet.
$ python airtraffic_consumer.py
Jetzt beginnen wir mit unserer Datenanalyse. An dieser Stelle werden wir ein Dashboard entwickeln, eine Anwendung, die in Echtzeit eine Rangliste der Städte anzeigt, die die meisten Touristen empfangen. Mit anderen Worten: Wir gruppieren die Daten nach der Spalte bis und bilden eine Summe basierend auf der Spalte Passagiere. Ganz einfach!
Erstellen Sie dazu im src-Verzeichnis ein Unterverzeichnis mit dem Namen dashboards und eine Datei mit dem Namen tourists_analysis.py. Fügen Sie dann den folgenden Code hinzu:
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
Und wir können unsere Datei jetzt über Spark-Submit ausführen. Aber beruhige dich! Wenn wir PySpark mit Kafka integrieren, müssen wir spark-submit anders ausführen. Es ist notwendig, das Apache Kafka-Paket und die aktuelle Version von Apache Spark über den Parameter --packages.
zu informierenWenn Sie Apache Spark zum ersten Mal mit Apache Kafka integrieren, kann die Ausführung von spark-submit eine Weile dauern. Dies liegt daran, dass die erforderlichen Pakete heruntergeladen werden müssen.
Stellen Sie sicher, dass der Producer noch läuft, damit wir die Datenanalyse in Echtzeit sehen können. Führen Sie im Dashboards-Verzeichnis den folgenden Befehl aus:
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0touristen_analysis.py
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Diese Analyse ist der vorherigen sehr ähnlich. Anstatt jedoch in Echtzeit die Städte zu analysieren, die die meisten Touristen empfangen, werden wir die Städte analysieren, in denen die meisten Menschen abreisen. Erstellen Sie dazu eine Datei mit dem Namen leavers_analysis.py und fügen Sie darin den folgenden Code hinzu:
$ docker compose up -d $ docker compose ps
Stellen Sie sicher, dass der Producer noch läuft, damit wir die Datenanalyse in Echtzeit sehen können. Führen Sie im Dashboards-Verzeichnis den folgenden Befehl aus:
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 Leavers_analysis.py
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Diese Analyse ist viel einfacher als die vorherigen. Lassen Sie uns in Echtzeit analysieren, welche Flugzeuge die meisten Passagiere zwischen Städten auf der ganzen Welt befördern. Erstellen Sie eine Datei mit dem Namen aircrafts_analysis.py und fügen Sie darin den folgenden Code hinzu:
$ python -m venv venv $ venv\scripts\activate $ pip install -r requirements.txt
Stellen Sie sicher, dass der Producer noch läuft, damit wir die Datenanalyse in Echtzeit sehen können. Führen Sie im Dashboards-Verzeichnis den folgenden Befehl aus:
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 Aircrafts_analysis.py
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
Und wir sind hier fertig, Leute! In diesem Artikel entwickeln wir eine Echtzeit-Datenanalyse basierend auf fiktiven Flugverkehrsdaten mit Spark Structured Streaming und Apache Kafka.
Dazu haben wir einen Produzenten entwickelt, der diese Daten in Echtzeit an das Kafka-Thema sendet, und dann haben wir 3 Dashboards entwickelt, um diese Daten in Echtzeit zu analysieren.
Ich hoffe, es hat dir gefallen. Bis zum nächsten Mal?.
Das obige ist der detaillierte Inhalt vonEchtzeit-Flugverkehrsdatenanalyse mit Spark Structured Streaming und Apache Kafka. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!