Actuellement, nous vivons dans un monde où des péta-octets de données sont générés chaque seconde. En tant que tel, l'analyse et le traitement de ces données en temps réel deviennent plus qu'essentiels pour une entreprise cherchant à générer des informations commerciales plus précises à mesure que de plus en plus de données sont produites.
Aujourd'hui, nous allons développer une analyse de données en temps réel basée sur des données fictives du trafic aérien à l'aide de Spark Structured Streaming et d'Apache Kafka. Si vous ne savez pas ce que sont ces technologies, je vous suggère de lire mon article que j'ai écrit pour les présenter plus en détail, ainsi que d'autres concepts qui seront abordés tout au long de cet article. Alors, n'oubliez pas d'y jeter un œil ?.
Vous pouvez consulter le projet complet sur mon GitHub.
Eh bien, imaginez que vous, un ingénieur de données, travaillez dans une compagnie aérienne appelée SkyX, où des données sur le trafic aérien sont générées chaque seconde.
Il vous a été demandé de développer un tableau de bord affichant en temps réel les données de ces vols, comme par exemple un classement des villes les plus visitées à l'étranger ; les villes où partent la plupart des gens ; et l'avion qui transporte le plus de personnes dans le monde.
Voici les données générées à chaque vol :
Voici l'architecture de base de notre projet :
Ce tutoriel suppose que PySpark est déjà installé sur votre machine. Si vous ne l'avez pas déjà fait, consultez les étapes dans la documentation elle-même.
Quant à Apache Kafka, nous l'utiliserons via la conteneurisation via Docker??.
Et enfin, nous utiliserons Python via un environnement virtuel.
Sans plus tard, créez un dossier appelé skyx et ajoutez le fichier docker-compose.yml à l'intérieur.
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
Maintenant, ajoutez le contenu suivant dans le fichier docker-compose :
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Fait ! Nous pouvons maintenant télécharger notre serveur Kafka. Pour cela, tapez la commande suivante dans le terminal :
$ docker compose up -d $ docker compose ps
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Remarque : ce tutoriel utilise la version 2.0 de Docker Compose. C'est pourquoi il n'y a pas de "-" entre docker et compose ☺.
Maintenant, nous devons créer un sujet au sein de Kafka qui stockera les données envoyées en temps réel par le producteur. Pour ce faire, accédons à Kafka à l'intérieur du conteneur :
$ docker compose exec kafka bash
Et enfin créez le sujet, appelé trafic aérien.
$ kafka-topics --create --topic airtraffic --bootstrap-server localhost:29092
Création du sujet trafic aérien.
Pour développer notre producteur, c'est-à-dire l'application qui se chargera d'envoyer les données du trafic aérien en temps réel au sujet Kafka, nous devons utiliser la bibliothèque kafka-python. kafka-python est une bibliothèque développée par la communauté qui nous permet de développer des producteurs et des consommateurs qui s'intègrent à Apache Kafka.
Tout d'abord, créons un fichier appelé requirements.txt et ajoutons-y la dépendance suivante :
kafka-python
Deuxièmement, nous allons créer un environnement virtuel et installer les dépendances dans le fichier exigences.txt :
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
Fait ! Notre environnement est désormais prêt pour le développement ?.
Créons maintenant notre producteur. Comme mentionné, le producteur sera chargé d'envoyer les données du trafic aérien au sujet Kafka nouvellement créé.
Comme cela a également été dit dans l'architecture, SkyX ne vole qu'entre cinq villes à travers le monde, et ne dispose que de cinq avions disponibles ?. Il est à noter que chaque avion transporte entre 50 et 100 personnes.
A noter que les données sont générées aléatoirement et envoyées au sujet au format json dans un intervalle de temps compris entre 1 et 6 secondes ?.
C'est parti ! Créez un sous-répertoire appelé src et un autre sous-répertoire appelé kafka. Dans le répertoire kafka, créez un fichier appelé airtraffic_producer.py et ajoutez-y le code suivant :
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Fait ! Nous développons notre producteur. Exécutez-le et laissez-le fonctionner pendant un moment.
$python airtraffic_producer.py
Développons maintenant notre consommateur. Ce sera une application très simple. Il affichera simplement les données arrivant dans le sujet kafka en temps réel dans le terminal.
Toujours dans le répertoire kafka, créez un fichier appelé airtraffic_consumer.py et ajoutez-y le code suivant :
$ docker compose up -d $ docker compose ps
Tu vois, je t'avais dit que c'était très simple. Exécutez-le et regardez les données qui seront affichées en temps réel au fur et à mesure que le producteur envoie des données au sujet.
$python airtraffic_consumer.py
Maintenant, nous commençons par notre analyse des données. A ce stade, nous développerons un tableau de bord, une application, qui affichera en temps réel un classement des villes qui reçoivent le plus de touristes. En d'autres termes, nous regrouperons les données par la colonne à et ferons une somme basée sur la colonne passagers. Très simple !
Pour ce faire, dans le répertoire src, créez un sous-répertoire appelé dashboards et créez un fichier appelé tourists_analysis.py. Ajoutez ensuite le code suivant à l'intérieur :
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
Et nous pouvons maintenant exécuter notre fichier via spark-submit. Mais calme-toi ! Lorsque nous intégrons PySpark à Kafka, nous devons exécuter Spark-submit différemment. Il est nécessaire de renseigner le package Apache Kafka et la version actuelle d'Apache Spark via le paramètre --packages.
Si c'est la première fois que vous intégrez Apache Spark à Apache Kafka, l'exécution de spark-submit peut prendre un certain temps. En effet, il doit télécharger les packages nécessaires.
Assurez-vous que le producteur est toujours en cours d'exécution afin que nous puissions voir l'analyse des données en temps réel. Dans le répertoire des tableaux de bord, exécutez la commande suivante :
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 touristes_analysis.py
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Cette analyse est très similaire à la précédente. Cependant, au lieu d’analyser en temps réel les villes qui reçoivent le plus de touristes, nous analyserons les villes où partent le plus de gens. Pour ce faire, créez un fichier appelé leavers_analysis.py et ajoutez-y le code suivant :
$ docker compose up -d $ docker compose ps
Assurez-vous que le producteur est toujours en cours d'exécution afin que nous puissions voir l'analyse des données en temps réel. Dans le répertoire des tableaux de bord, exécutez la commande suivante :
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 levers_analysis.py
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Cette analyse est beaucoup plus simple que les précédentes. Analysons en temps réel les avions qui transportent le plus de passagers entre les villes du monde. Créez un fichier appelé aircrafts_analysis.py et ajoutez-y le code suivant :
$ python -m venv venv $ venv\scripts\activate $ pip install -r requirements.txt
Assurez-vous que le producteur est toujours en cours d'exécution afin que nous puissions voir l'analyse des données en temps réel. Dans le répertoire des tableaux de bord, exécutez la commande suivante :
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 planes_analysis.py
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
Et on termine ici, les gars ! Dans cet article, nous développons une analyse de données en temps réel basée sur des données fictives du trafic aérien à l'aide de Spark Structured Streaming et d'Apache Kafka.
Pour ce faire, nous avons développé un producteur qui envoie ces données en temps réel au sujet Kafka, puis nous avons développé 3 tableaux de bord pour analyser ces données en temps réel.
J'espère que ça vous a plu. A la prochaine fois ?.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!