卡夫卡簡介
為什麼要使用Kafka?
卡夫卡建築
組件:
製作人:
這些是將資料/訊息發送到 Kafka 的應用程式或服務。生產者將訊息推送到 Kafka 中的特定主題。
主題:
主題是發布記錄的類別或提要名稱。主題被分區以允許可擴展性和並行性。
分區:
經紀人:
消費者:
消費者是從主題讀取訊息的應用程式或服務。
消費者訂閱主題,從 Kafka 代理中提取資料。
消費群:
動物園管理員:
Kafka 用例
使用 Python 示範如何在即時場景中使用 Kafka 的範例 :
共乘應用程式的位置追蹤。
為了簡單起見,我們將使用 kafka-python 庫建立一個生產者(以模擬發送位置更新的驅動程式)和一個消費者(以模擬處理這些位置更新的服務)。
1。設定 Kafka
確保您在本地運行 Kafka 或使用雲端提供者。您可以按照 Kafka 快速入門指南在本機下載並執行 Kafka。
2。安裝 Kafka Python 函式庫
您可以使用 pip 安裝 Kafka Python 函式庫:
pip install kafka-python
3。 Python Kafka Producer(模擬驅動程式位置更新)
生產者模擬驅動程式向 Kafka 主題發送位置更新(驅動程式位置)。
from kafka import KafkaProducer import json import time import random # Kafka Producer producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') # Serialize data to JSON ) def send_location_updates(driver_id): while True: # Simulating random GPS coordinates (latitude, longitude) location = { "driver_id": driver_id, "latitude": round(random.uniform(40.0, 41.0), 6), "longitude": round(random.uniform(-74.0, -73.0), 6), "timestamp": time.time() } # Send location data to Kafka producer.send('driver-location', location) print(f"Sent: {location}") time.sleep(5) # Sleep for 5 seconds to simulate real-time updates # Start sending updates for driver_id = 101 send_location_updates(driver_id=101)
4。 Python Kafka Consumer(模擬乘車匹配服務)
消費者從司機位置主題讀取位置更新並處理它們。
from kafka import KafkaConsumer import json # Kafka Consumer consumer = KafkaConsumer( 'driver-location', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', # Start from the earliest message enable_auto_commit=True, group_id='location-group', value_deserializer=lambda x: json.loads(x.decode('utf-8')) # Deserialize data from JSON ) def process_location_updates(): print("Waiting for location updates...") for message in consumer: location = message.value driver_id = location['driver_id'] latitude = location['latitude'] longitude = location['longitude'] timestamp = location['timestamp'] print(f"Received location update for Driver {driver_id}: ({latitude}, {longitude}) at {timestamp}") # Start consuming location updates process_location_updates()
說明:
生產者(發送位置更新的司機):
消費者(共乘服務):
運行範例(我在 Windows 機器上運行):
pip install kafka-python
from kafka import KafkaProducer import json import time import random # Kafka Producer producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') # Serialize data to JSON ) def send_location_updates(driver_id): while True: # Simulating random GPS coordinates (latitude, longitude) location = { "driver_id": driver_id, "latitude": round(random.uniform(40.0, 41.0), 6), "longitude": round(random.uniform(-74.0, -73.0), 6), "timestamp": time.time() } # Send location data to Kafka producer.send('driver-location', location) print(f"Sent: {location}") time.sleep(5) # Sleep for 5 seconds to simulate real-time updates # Start sending updates for driver_id = 101 send_location_updates(driver_id=101)
現在使用 python 在 2 個單獨的終端視窗中運行生產者和消費者。
運行生產者腳本來模擬驅動程式發送位置更新。
執行消費者腳本以查看乘車匹配服務即時處理位置更新。
結論
Apache Kafka 提供了一個用於管理即時資料流的卓越平台。透過將 Kafka 與 Python 結合,開發人員可以建立強大的資料管道和即時分析解決方案。
無論是車輛追蹤、物聯網資料或即時儀表板,Kafka with Python 都具有高度可擴充性,可以適應各種用例。因此,開始嘗試 Kafka,您將對其在實際應用中的潛力感到驚訝!
以上是使用 Python 的 Kafka 初學者指南:即時資料處理和應用程式的詳細內容。更多資訊請關注PHP中文網其他相關文章!