目前,我们生活在一个每秒生成千万亿字节数据的世界。因此,对于希望随着数据的产生而更准确地生成业务洞察的公司来说,实时分析和处理这些数据变得非常重要。
今天,我们将使用 Spark Structured Streaming 和 Apache Kafka 开发基于虚构空中交通数据的实时数据分析。如果您不知道这些技术是什么,我建议您阅读我撰写的文章,更详细地介绍它们,以及本文将涵盖的其他概念。所以,别忘了检查一下吗?.
你可以在我的 GitHub 上查看完整的项目。
好吧,想象一下,您是一名数据工程师,在一家名为 SkyX 的航空公司工作,该航空公司每秒都会生成有关空中交通的数据。
您被要求开发一个仪表板来显示这些航班的实时数据,例如国外访问量最大的城市的排名;大多数人离开的城市;以及在世界各地运送最多乘客的飞机。
这是每次航班生成的数据:
以下是我们项目的基本架构:
本教程假设您的计算机上已经安装了 PySpark。如果您还没有这样做,请查看文档本身中的步骤。
至于Apache Kafka,我们将通过Docker容器化来使用它??.
最后,我们将通过虚拟环境使用 Python。
话不多说,创建一个名为 skyx 的文件夹,并在其中添加文件 docker-compose.yml。
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
现在,在 docker-compose 文件中添加以下内容:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
完成!我们现在可以上传我们的 Kafka 服务器。为此,请在终端中键入以下命令:
$ docker compose up -d $ docker compose ps
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
注意:本教程使用 Docker Compose 2.0 版本。这就是为什么 docker 和 compose ☺ 之间没有“-”。
现在,我们需要在 Kafka 中创建一个主题,用于存储生产者实时发送的数据。为此,让我们访问容器内的 Kafka:
$ docker compose 执行 kafka bash
最后创建主题,名为airtraffic。
$ kafka-topics --create --topic Airtraffic --bootstrap-server localhost:29092
创建了主题airtraffic。
为了开发我们的生产者,即负责将实时空中交通数据发送到 Kafka 主题的应用程序,我们需要使用 kafka-python 库。 kafka-python 是一个社区开发的库,允许我们开发与 Apache Kafka 集成的生产者和消费者。
首先,让我们创建一个名为 requirements.txt 的文件,并在其中添加以下依赖项:
卡夫卡-python
其次,我们将创建一个虚拟环境并在requirements.txt文件中安装依赖项:
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
完成!现在我们的环境已经准备好开发了吗?.
现在让我们创建我们的生产者。如前所述,生产者将负责将空中交通数据发送到新创建的 Kafka 主题。
正如架构中所说,SkyX 只在全球五个城市之间飞行,并且只有五架飞机?。值得一提的是,每架飞机可搭载50至100人。
注意,数据是随机生成的,以json格式发送到主题,时间间隔在1到6秒之间?.
我们走吧!创建一个名为 src 的子目录和另一个名为 kafka 的子目录。在 kafka 目录中,创建一个名为 airtraffic_ Producer.py 的文件,并在其中添加以下代码:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
完成!我们发展我们的生产者。运行它并让它运行一段时间。
$ python Airtraffic_ Producer.py
现在让我们开发我们的消费者。这将是一个非常简单的应用程序。它只是在终端中实时显示到达kafka主题的数据。
仍在 kafka 目录中,创建一个名为 airtraffic_consumer.py 的文件,并在其中添加以下代码:
$ docker compose up -d $ docker compose ps
看,我告诉过你这很简单。运行它并观察生产者向主题发送数据时实时显示的数据。
$ python airtraffic_consumer.py
现在我们开始数据分析。此时,我们将开发一个仪表板,一个应用程序,它将实时显示接待游客最多的城市的排名。换句话说,我们将按 to 列对数据进行分组,并根据 passengers 列进行求和。非常简单!
为此,请在 src 目录中创建一个名为 dashboards 的子目录,并创建一个名为 tourists_analysis.py 的文件。然后在其中添加以下代码:
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
我们现在可以通过spark-submit执行我们的文件。但冷静点!当我们将 PySpark 与 Kafka 集成时,我们必须以不同的方式运行 Spark-Submit。需要通过 --packages.
参数告知 Apache Kafka 包以及 Apache Spark 当前版本如果这是您第一次将 Apache Spark 与 Apache Kafka 集成,spark-submit 可能需要一段时间才能运行。这是因为它需要下载必要的软件包。
确保生产者仍在运行,以便我们可以实时看到数据分析。在仪表板目录中,运行以下命令:
$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0游客分析.py
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
这个分析与上一个非常相似。然而,我们不会实时分析接待游客最多的城市,而是分析最多人离开的城市。为此,创建一个名为 leavers_analysis.py 的文件,并在其中添加以下代码:
$ docker compose up -d $ docker compose ps
确保生产者仍在运行,以便我们可以实时看到数据分析。在仪表板目录中,运行以下命令:
$ Spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 Leavers_analysis.py
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
这个分析比前面的简单多了。让我们实时分析在世界各地城市之间运送最多乘客的飞机。创建一个名为 aircrafts_analysis.py 的文件,并在其中添加以下代码:
$ python -m venv venv $ venv\scripts\activate $ pip install -r requirements.txt
确保生产者仍在运行,以便我们可以实时看到数据分析。在仪表板目录中,运行以下命令:
$ Spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 heavens_analysis.py
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
我们就到此结束了,伙计们!在本文中,我们使用 Spark Structured Streaming 和 Apache Kafka 开发基于虚构空中交通数据的实时数据分析。
为此,我们开发了一个生产者,将这些数据实时发送到 Kafka 主题,然后我们开发了 3 个仪表板来实时分析这些数据。
我希望你喜欢它。下次见?.
以上是使用 Spark Structured Streaming 和 Apache Kafka 进行实时空中交通数据分析的详细内容。更多信息请关注PHP中文网其他相关文章!