首页 后端开发 Python教程 使用 Python 的 Kafka 初学者指南:实时数据处理和应用程序

使用 Python 的 Kafka 初学者指南:实时数据处理和应用程序

Nov 05, 2024 pm 05:41 PM

A Beginner’s Guide to Kafka with Python: Real-Time Data Processing and Applications

卡夫卡简介

  • Kafka 是一个由 Apache 开发的开源分布式事件流平台。
  • 最初由 LinkedIn 创建,旨在处理高吞吐量、容错和实时数据流。
  • Kafka 允许系统发布和订阅记录流(消息)、处理它们并有效地存储它们。

为什么使用Kafka?

  • 高吞吐量:Kafka 每秒可以处理数百万条消息。
  • 容错性:Kafka是分布式的,这意味着它可以跨多个节点复制数据以确保可靠性。
  • 持久性:Kafka将数据持久保存到磁盘并可以重放消息,确保消息传递的可靠性。
  • 实时处理:Kafka 可以实时处理数据流,非常适合监控、分析或事件驱动系统等应用程序。
  • 可扩展性:Kafka 可以通过添加更多代理来处理大量数据来轻松扩展。
  • 系统解耦:Kafka充当消息传递的中间层,允许不同系统异步通信。

卡夫卡建筑

A Beginner’s Guide to Kafka with Python: Real-Time Data Processing and Applications

组件:

制作人:
这些是将数据/消息发送到 Kafka 的应用程序或服务。生产者将消息推送到 Kafka 中的特定主题。

主题:
主题是发布记录的类别或提要名称。主题被分区以允许可扩展性和并行性。

分区:

  • 每个主题分为一个或多个分区。
  • 分区使Kafka能够处理更多消息并支持 并行处理。
  • 每个分区都有一个唯一的ID,并且可以存储分区的子集 主题的数据。

经纪人:

  • Kafka 作为 Broker(服务器)集群运行,每个 Broker 处理数据 对于多个主题和分区。
  • 代理存储和管理分区,处理读写 来自生产者和消费者的请求。
  • 每个经纪商都由唯一的 ID 标识。

消费者:

消费者是从主题读取消息的应用程序或服务。
消费者订阅主题,从 Kafka 代理中提取数据。

消费群体:

  • 消费者被组织成消费者组。
  • 分区内的每条消息仅传递给组内的一个消费者,这可以实现多个消费者之间的负载平衡。

动物园管理员:

  • ZooKeeper 管理和协调 Kafka 代理,跟踪代理、主题和分区。
  • 它有助于管理分区的领导者选举并监控集群健康状况。

Kafka 用例

  • 实时分析:公司使用 Kafka 实时处理和分析数据流,以用于监控系统,例如金融交易分析。
  • 日志聚合:Kafka 整合来自多个服务或应用程序的日志,以进行处理、警报或存储。
  • 数据管道:Kafka 用作在不同系统或服务(ETL 管道)之间传输大量数据的骨干。
  • 物联网应用:Kafka 可以处理来自物联网传感器的数据流,从而实现实时分析和响应。
  • 微服务通信:Kafka 作为微服务架构的可靠消息传递平台,支持异步、解耦通信。
  • 实时车辆跟踪:以下示例说明了如何使用 Kafka 实时跟踪车辆。

使用 Python 演示如何在实时场景中使用 Kafka 的示例 :

拼车应用程序的位置跟踪。

为了简单起见,我们将使用 kafka-python 库创建一个生产者(以模拟发送位置更新的驱动程序)和一个消费者(以模拟处理这些位置更新的服务)。

1。设置 Kafka
确保您在本地运行 Kafka 或使用云提供商。您可以按照 Kafka 快速入门指南在本地下载并运行 Kafka。

2。安装 Kafka Python 库
您可以使用 pip 安装 Kafka Python 库:

pip install kafka-python
登录后复制
登录后复制

3。 Python Kafka Producer(模拟驱动程序位置更新)
生产者模拟驱动程序向 Kafka 主题发送位置更新(驱动程序位置)。

from kafka import KafkaProducer
import json
import time
import random

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data to JSON
)

def send_location_updates(driver_id):
    while True:
        # Simulating random GPS coordinates (latitude, longitude)
        location = {
            "driver_id": driver_id,
            "latitude": round(random.uniform(40.0, 41.0), 6),
            "longitude": round(random.uniform(-74.0, -73.0), 6),
            "timestamp": time.time()
        }
        # Send location data to Kafka
        producer.send('driver-location', location)
        print(f"Sent: {location}")
        time.sleep(5)  # Sleep for 5 seconds to simulate real-time updates

# Start sending updates for driver_id = 101
send_location_updates(driver_id=101)
登录后复制
登录后复制

4。 Python Kafka Consumer(模拟乘车匹配服务)
消费者从司机位置主题读取位置更新并处理它们。

from kafka import KafkaConsumer
import json

# Kafka Consumer
consumer = KafkaConsumer(
    'driver-location',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',  # Start from the earliest message
    enable_auto_commit=True,
    group_id='location-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))  # Deserialize data from JSON
)

def process_location_updates():
    print("Waiting for location updates...")
    for message in consumer:
        location = message.value
        driver_id = location['driver_id']
        latitude = location['latitude']
        longitude = location['longitude']
        timestamp = location['timestamp']
        print(f"Received location update for Driver {driver_id}: ({latitude}, {longitude}) at {timestamp}")

# Start consuming location updates
process_location_updates()
登录后复制

说明:

生产者(发送位置更新的司机):

  • 生产者将一个 JSON 对象发送到 Kafka 主题 driver-location,其中包含 driver_id、纬度、经度和时间戳等字段。
  • 生产者通过每 5 秒发送一次位置数据来模拟实时 GPS 更新。

消费者(拼车服务):

  • 消费者订阅驾驶员位置主题,监听更新。
  • 每次将位置更新发布到 Kafka 时,消费者都会处理并打印它,模拟使用此数据来匹配司机和乘客的服务。

运行示例(我在 Windows 机器上运行):

  1. 启动 Zookeeper
pip install kafka-python
登录后复制
登录后复制
  1. 启动本地 Kafka 服务器。
from kafka import KafkaProducer
import json
import time
import random

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data to JSON
)

def send_location_updates(driver_id):
    while True:
        # Simulating random GPS coordinates (latitude, longitude)
        location = {
            "driver_id": driver_id,
            "latitude": round(random.uniform(40.0, 41.0), 6),
            "longitude": round(random.uniform(-74.0, -73.0), 6),
            "timestamp": time.time()
        }
        # Send location data to Kafka
        producer.send('driver-location', location)
        print(f"Sent: {location}")
        time.sleep(5)  # Sleep for 5 seconds to simulate real-time updates

# Start sending updates for driver_id = 101
send_location_updates(driver_id=101)
登录后复制
登录后复制

现在使用 python 在 2 个单独的终端窗口中运行生产者和消费者。

  1. 运行生产者脚本来模拟驱动程序发送位置更新。

  2. 运行消费者脚本以查看乘车匹配服务实时处理位置更新。

结论
Apache Kafka 提供了一个用于管理实时数据流的卓越平台。通过将 Kafka 与 Python 相结合,开发人员可以构建强大的数据管道和实时分析解决方案。

无论是车辆跟踪、物联网数据还是实时仪表板,Kafka with Python 都具有高度可扩展性,可以适应各种用例。因此,开始尝试 Kafka,您将会对其在实际应用中的潜力感到惊讶!

以上是使用 Python 的 Kafka 初学者指南:实时数据处理和应用程序的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

<🎜>:泡泡胶模拟器无穷大 - 如何获取和使用皇家钥匙
4 周前 By 尊渡假赌尊渡假赌尊渡假赌
北端:融合系统,解释
4 周前 By 尊渡假赌尊渡假赌尊渡假赌
Mandragora:巫婆树的耳语 - 如何解锁抓钩
3 周前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

热门话题

Java教程
1671
14
CakePHP 教程
1428
52
Laravel 教程
1329
25
PHP教程
1276
29
C# 教程
1256
24
Python与C:学习曲线和易用性 Python与C:学习曲线和易用性 Apr 19, 2025 am 12:20 AM

Python更易学且易用,C 则更强大但复杂。1.Python语法简洁,适合初学者,动态类型和自动内存管理使其易用,但可能导致运行时错误。2.C 提供低级控制和高级特性,适合高性能应用,但学习门槛高,需手动管理内存和类型安全。

Python和时间:充分利用您的学习时间 Python和时间:充分利用您的学习时间 Apr 14, 2025 am 12:02 AM

要在有限的时间内最大化学习Python的效率,可以使用Python的datetime、time和schedule模块。1.datetime模块用于记录和规划学习时间。2.time模块帮助设置学习和休息时间。3.schedule模块自动化安排每周学习任务。

Python vs.C:探索性能和效率 Python vs.C:探索性能和效率 Apr 18, 2025 am 12:20 AM

Python在开发效率上优于C ,但C 在执行性能上更高。1.Python的简洁语法和丰富库提高开发效率。2.C 的编译型特性和硬件控制提升执行性能。选择时需根据项目需求权衡开发速度与执行效率。

学习Python:2小时的每日学习是否足够? 学习Python:2小时的每日学习是否足够? Apr 18, 2025 am 12:22 AM

每天学习Python两个小时是否足够?这取决于你的目标和学习方法。1)制定清晰的学习计划,2)选择合适的学习资源和方法,3)动手实践和复习巩固,可以在这段时间内逐步掌握Python的基本知识和高级功能。

Python vs. C:了解关键差异 Python vs. C:了解关键差异 Apr 21, 2025 am 12:18 AM

Python和C 各有优势,选择应基于项目需求。1)Python适合快速开发和数据处理,因其简洁语法和动态类型。2)C 适用于高性能和系统编程,因其静态类型和手动内存管理。

Python标准库的哪一部分是:列表或数组? Python标准库的哪一部分是:列表或数组? Apr 27, 2025 am 12:03 AM

pythonlistsarepartofthestAndArdLibrary,herilearRaysarenot.listsarebuilt-In,多功能,和Rused ForStoringCollections,而EasaraySaraySaraySaraysaraySaraySaraysaraySaraysarrayModuleandleandleandlesscommonlyusedDduetolimitedFunctionalityFunctionalityFunctionality。

Python:自动化,脚本和任务管理 Python:自动化,脚本和任务管理 Apr 16, 2025 am 12:14 AM

Python在自动化、脚本编写和任务管理中表现出色。1)自动化:通过标准库如os、shutil实现文件备份。2)脚本编写:使用psutil库监控系统资源。3)任务管理:利用schedule库调度任务。Python的易用性和丰富库支持使其在这些领域中成为首选工具。

科学计算的Python:详细的外观 科学计算的Python:详细的外观 Apr 19, 2025 am 12:15 AM

Python在科学计算中的应用包括数据分析、机器学习、数值模拟和可视化。1.Numpy提供高效的多维数组和数学函数。2.SciPy扩展Numpy功能,提供优化和线性代数工具。3.Pandas用于数据处理和分析。4.Matplotlib用于生成各种图表和可视化结果。

See all articles