使用 Python 的 Kafka 初学者指南:实时数据处理和应用程序
卡夫卡简介
- Kafka 是一个由 Apache 开发的开源分布式事件流平台。
- 最初由 LinkedIn 创建,旨在处理高吞吐量、容错和实时数据流。
- Kafka 允许系统发布和订阅记录流(消息)、处理它们并有效地存储它们。
为什么使用Kafka?
- 高吞吐量:Kafka 每秒可以处理数百万条消息。
- 容错性:Kafka是分布式的,这意味着它可以跨多个节点复制数据以确保可靠性。
- 持久性:Kafka将数据持久保存到磁盘并可以重放消息,确保消息传递的可靠性。
- 实时处理:Kafka 可以实时处理数据流,非常适合监控、分析或事件驱动系统等应用程序。
- 可扩展性:Kafka 可以通过添加更多代理来处理大量数据来轻松扩展。
- 系统解耦:Kafka充当消息传递的中间层,允许不同系统异步通信。
卡夫卡建筑
组件:
制作人:
这些是将数据/消息发送到 Kafka 的应用程序或服务。生产者将消息推送到 Kafka 中的特定主题。
主题:
主题是发布记录的类别或提要名称。主题被分区以允许可扩展性和并行性。
分区:
- 每个主题分为一个或多个分区。
- 分区使Kafka能够处理更多消息并支持 并行处理。
- 每个分区都有一个唯一的ID,并且可以存储分区的子集 主题的数据。
经纪人:
- Kafka 作为 Broker(服务器)集群运行,每个 Broker 处理数据 对于多个主题和分区。
- 代理存储和管理分区,处理读写 来自生产者和消费者的请求。
- 每个经纪商都由唯一的 ID 标识。
消费者:
消费者是从主题读取消息的应用程序或服务。
消费者订阅主题,从 Kafka 代理中提取数据。
消费群体:
- 消费者被组织成消费者组。
- 分区内的每条消息仅传递给组内的一个消费者,这可以实现多个消费者之间的负载平衡。
动物园管理员:
- ZooKeeper 管理和协调 Kafka 代理,跟踪代理、主题和分区。
- 它有助于管理分区的领导者选举并监控集群健康状况。
Kafka 用例
- 实时分析:公司使用 Kafka 实时处理和分析数据流,以用于监控系统,例如金融交易分析。
- 日志聚合:Kafka 整合来自多个服务或应用程序的日志,以进行处理、警报或存储。
- 数据管道:Kafka 用作在不同系统或服务(ETL 管道)之间传输大量数据的骨干。
- 物联网应用:Kafka 可以处理来自物联网传感器的数据流,从而实现实时分析和响应。
- 微服务通信:Kafka 作为微服务架构的可靠消息传递平台,支持异步、解耦通信。
- 实时车辆跟踪:以下示例说明了如何使用 Kafka 实时跟踪车辆。
使用 Python 演示如何在实时场景中使用 Kafka 的示例 :
拼车应用程序的位置跟踪。
为了简单起见,我们将使用 kafka-python 库创建一个生产者(以模拟发送位置更新的驱动程序)和一个消费者(以模拟处理这些位置更新的服务)。
1。设置 Kafka
确保您在本地运行 Kafka 或使用云提供商。您可以按照 Kafka 快速入门指南在本地下载并运行 Kafka。
2。安装 Kafka Python 库
您可以使用 pip 安装 Kafka Python 库:
pip install kafka-python
3。 Python Kafka Producer(模拟驱动程序位置更新)
生产者模拟驱动程序向 Kafka 主题发送位置更新(驱动程序位置)。
from kafka import KafkaProducer import json import time import random # Kafka Producer producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') # Serialize data to JSON ) def send_location_updates(driver_id): while True: # Simulating random GPS coordinates (latitude, longitude) location = { "driver_id": driver_id, "latitude": round(random.uniform(40.0, 41.0), 6), "longitude": round(random.uniform(-74.0, -73.0), 6), "timestamp": time.time() } # Send location data to Kafka producer.send('driver-location', location) print(f"Sent: {location}") time.sleep(5) # Sleep for 5 seconds to simulate real-time updates # Start sending updates for driver_id = 101 send_location_updates(driver_id=101)
4。 Python Kafka Consumer(模拟乘车匹配服务)
消费者从司机位置主题读取位置更新并处理它们。
from kafka import KafkaConsumer import json # Kafka Consumer consumer = KafkaConsumer( 'driver-location', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', # Start from the earliest message enable_auto_commit=True, group_id='location-group', value_deserializer=lambda x: json.loads(x.decode('utf-8')) # Deserialize data from JSON ) def process_location_updates(): print("Waiting for location updates...") for message in consumer: location = message.value driver_id = location['driver_id'] latitude = location['latitude'] longitude = location['longitude'] timestamp = location['timestamp'] print(f"Received location update for Driver {driver_id}: ({latitude}, {longitude}) at {timestamp}") # Start consuming location updates process_location_updates()
说明:
生产者(发送位置更新的司机):
- 生产者将一个 JSON 对象发送到 Kafka 主题 driver-location,其中包含 driver_id、纬度、经度和时间戳等字段。
- 生产者通过每 5 秒发送一次位置数据来模拟实时 GPS 更新。
消费者(拼车服务):
- 消费者订阅驾驶员位置主题,监听更新。
- 每次将位置更新发布到 Kafka 时,消费者都会处理并打印它,模拟使用此数据来匹配司机和乘客的服务。
运行示例(我在 Windows 机器上运行):
- 启动 Zookeeper
pip install kafka-python
- 启动本地 Kafka 服务器。
from kafka import KafkaProducer import json import time import random # Kafka Producer producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') # Serialize data to JSON ) def send_location_updates(driver_id): while True: # Simulating random GPS coordinates (latitude, longitude) location = { "driver_id": driver_id, "latitude": round(random.uniform(40.0, 41.0), 6), "longitude": round(random.uniform(-74.0, -73.0), 6), "timestamp": time.time() } # Send location data to Kafka producer.send('driver-location', location) print(f"Sent: {location}") time.sleep(5) # Sleep for 5 seconds to simulate real-time updates # Start sending updates for driver_id = 101 send_location_updates(driver_id=101)
现在使用 python 在 2 个单独的终端窗口中运行生产者和消费者。
运行生产者脚本来模拟驱动程序发送位置更新。
运行消费者脚本以查看乘车匹配服务实时处理位置更新。
结论
Apache Kafka 提供了一个用于管理实时数据流的卓越平台。通过将 Kafka 与 Python 相结合,开发人员可以构建强大的数据管道和实时分析解决方案。
无论是车辆跟踪、物联网数据还是实时仪表板,Kafka with Python 都具有高度可扩展性,可以适应各种用例。因此,开始尝试 Kafka,您将会对其在实际应用中的潜力感到惊讶!
以上是使用 Python 的 Kafka 初学者指南:实时数据处理和应用程序的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

Python更易学且易用,C 则更强大但复杂。1.Python语法简洁,适合初学者,动态类型和自动内存管理使其易用,但可能导致运行时错误。2.C 提供低级控制和高级特性,适合高性能应用,但学习门槛高,需手动管理内存和类型安全。

要在有限的时间内最大化学习Python的效率,可以使用Python的datetime、time和schedule模块。1.datetime模块用于记录和规划学习时间。2.time模块帮助设置学习和休息时间。3.schedule模块自动化安排每周学习任务。

Python在开发效率上优于C ,但C 在执行性能上更高。1.Python的简洁语法和丰富库提高开发效率。2.C 的编译型特性和硬件控制提升执行性能。选择时需根据项目需求权衡开发速度与执行效率。

每天学习Python两个小时是否足够?这取决于你的目标和学习方法。1)制定清晰的学习计划,2)选择合适的学习资源和方法,3)动手实践和复习巩固,可以在这段时间内逐步掌握Python的基本知识和高级功能。

Python和C 各有优势,选择应基于项目需求。1)Python适合快速开发和数据处理,因其简洁语法和动态类型。2)C 适用于高性能和系统编程,因其静态类型和手动内存管理。

pythonlistsarepartofthestAndArdLibrary,herilearRaysarenot.listsarebuilt-In,多功能,和Rused ForStoringCollections,而EasaraySaraySaraySaraysaraySaraySaraysaraySaraysarrayModuleandleandleandlesscommonlyusedDduetolimitedFunctionalityFunctionalityFunctionality。

Python在自动化、脚本编写和任务管理中表现出色。1)自动化:通过标准库如os、shutil实现文件备份。2)脚本编写:使用psutil库监控系统资源。3)任务管理:利用schedule库调度任务。Python的易用性和丰富库支持使其在这些领域中成为首选工具。

Python在科学计算中的应用包括数据分析、机器学习、数值模拟和可视化。1.Numpy提供高效的多维数组和数学函数。2.SciPy扩展Numpy功能,提供优化和线性代数工具。3.Pandas用于数据处理和分析。4.Matplotlib用于生成各种图表和可视化结果。
