Currently, we live in a world where peta bytes of data are generated every second. As such, analyzing and processing this data in real time becomes more than essential for a company looking to generate business insights more accurately as data and more data is produced.
Today, we will develop real-time data analysis based on fictitious air traffic data using Spark Structured Streaming and Apache Kafka. If you don't know what these technologies are, I suggest reading my article that I wrote introducing them in more detail, as well as other concepts that will be covered throughout this article. So, don't forget to check it out?.
You can check out the complete project on my GitHub.
Well, imagine that you, a data engineer, work at an airline called SkyX, where data about air traffic is generated every second.
You were asked to develop a dashboard that displays real-time data from these flights, such as a ranking of the most visited cities abroad; the cities where most people leave; and the aircraft that transport the most people around the world.
This is the data that is generated with each flight:
Following is the basic architecture of our project:
This tutorial assumes you already have PySpark installed on your machine. If you haven't already, check out the steps in the documentation itself.
As for Apache Kafka, we will use it through containerization via Docker??.
And finally, we will use Python through a virtual environment.
Without further ado, create a folder called skyx and add the file docker-compose.yml inside it.
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
Now, add the following content inside the docker-compose file:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Done! We can now upload our Kafka server. To do this, type the following command in the terminal:
$ docker compose up -d $ docker compose ps
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Note: This tutorial is using version 2.0 of Docker Compose. This is why there is no "-" between docker and compose ☺.
Now, we need to create a topic within Kafka that will store the data sent in real time by the producer. To do this, let's access Kafka inside the container:
$ docker compose exec kafka bash
And finally create the topic, called airtraffic.
$ kafka-topics --create --topic airtraffic --bootstrap-server localhost:29092
Created topic airtraffic.
To develop our producer, that is, the application that will be responsible for sending real-time air traffic data to the Kafka topic, we need to use the kafka-python library. kafka-python is a community-developed library that allows us to develop producers and consumers that integrate with Apache Kafka.
First, let's create a file called requirements.txt and add the following dependency inside it:
kafka-python
Second, we will create a virtual environment and install the dependencies in the requirements.txt file:
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
Done! Now our environment is ready for development?.
Now let's create our producer. As mentioned, the producer will be responsible for sending air traffic data to the newly created Kafka topic.
As was also said in the architecture, SkyX only flies between five cities around the world, and only has five aircraft available?. It is worth mentioning that each aircraft carries between 50 and 100 people.
Note that the data is generated randomly and sent to the topic in json format in a time interval between 1 and 6 seconds?.
Let's go! Create a subdirectory called src and another subdirectory called kafka. Inside the kafka directory, create a file called airtraffic_producer.py and add the following code inside it:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Done! We develop our producer. Run it and let it run for a while.
$ python airtraffic_producer.py
Now let's develop our consumer. This will be a very simple application. It will just display the data arriving in the kafka topic in real time in the terminal.
Still inside the kafka directory, create a file called airtraffic_consumer.py and add the following code inside it:
$ docker compose up -d $ docker compose ps
See, I told you it was very simple. Run it and watch the data that will be displayed in real time as the producer sends data to the topic.
$ python airtraffic_consumer.py
Now we start with our data analysis. At this point, we will develop a dashboard, an application, that will display in real time a ranking of the cities that receive the most tourists. In other words, we will group the data by the to column and make a sum based on the passengers column. Very simple!
To do this, within the src directory, create a subdirectory called dashboards and create a file called tourists_analysis.py. Then add the following code inside it:
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
And we can now execute our file through spark-submit. But calm down! When we are integrating PySpark with Kafka, we must run spark-submit differently. It is necessary to inform the Apache Kafka package and the current version of Apache Spark through the --packages.
parameterIf this is your first time integrating Apache Spark with Apache Kafka, spark-submit may take a while to run. This is because it needs to download the necessary packages.
Make sure the producer is still running so we can see the data analysis in real time. Inside the dashboards directory, run the following command:
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 tourists_analysis.py
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
This analysis is very similar to the previous one. However, instead of analyzing in real time the cities that receive the most tourists, we will analyze the cities where the most people leave. To do this, create a file called leavers_analysis.py and add the following code inside it:
$ docker compose up -d $ docker compose ps
Make sure the producer is still running so we can see the data analysis in real time. Inside the dashboards directory, run the following command:
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 leavers_analysis.py
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
This analysis is much simpler than the previous ones. Let's analyze in real time the aircraft that transport the most passengers between cities around the world. Create a file called aircrafts_analysis.py and add the following code inside it:
$ python -m venv venv $ venv\scripts\activate $ pip install -r requirements.txt
Make sure the producer is still running so we can see the data analysis in real time. Inside the dashboards directory, run the following command:
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 aircrafts_analysis.py
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
And we finish here, guys! In this article we develop real-time data analysis based on fictitious air traffic data using Spark Structured Streaming and Apache Kafka.
To do this, we developed a producer that sends this data in real time to the Kafka topic, and then we developed 3 dashboards to analyze this data in real time.
I hope you liked it. See you next time?.
The above is the detailed content of Real-time air traffic data analysis with Spark Structured Streaming and Apache Kafka. For more information, please follow other related articles on the PHP Chinese website!