ホームページ バックエンド開発 Python チュートリアル Python を使用した Kafka の初心者ガイド: リアルタイム データ処理とアプリケーション

Python を使用した Kafka の初心者ガイド: リアルタイム データ処理とアプリケーション

Nov 05, 2024 pm 05:41 PM

A Beginner’s Guide to Kafka with Python: Real-Time Data Processing and Applications

カフカの紹介

  • Kafka は、Apache によって開発されたオープンソースの分散イベント ストリーミング プラットフォームです。
  • もともと LinkedIn によって作成されたこのサービスは、高スループット、フォールト トレラント、リアルタイムのデータ ストリーミングを処理するように設計されました。
  • Kafka を使用すると、システムはレコード (メッセージ) のストリームをパブリッシュおよびサブスクライブし、効率的に処理して保存できます。

なぜ Kafka が使用されるのですか?

  • 高スループット: Kafka は 1 秒あたり数百万のメッセージを処理できます。
  • フォールト トレランス: Kafka は分散されており、複数のノード間でデータを複製して信頼性を確保できます。
  • 耐久性: Kafka はデータをディスクに永続化し、メッセージを再生できるため、メッセージ配信の信頼性が確保されます。
  • リアルタイム処理: Kafka はデータのストリームをリアルタイムで処理でき、監視、分析、イベント駆動型システムなどのアプリケーションに最適です。
  • スケーラビリティ: Kafka は、大量のデータを処理するブローカーを追加することで簡単に拡張できます。
  • システムの分離: Kafka はメッセージングの中間層として機能し、異なるシステムが非同期に通信できるようにします。

カフカ アーキテクチャ

A Beginner’s Guide to Kafka with Python: Real-Time Data Processing and Applications

コンポーネント:

プロデューサー:
これらは、Kafka にデータ/メッセージを送信するアプリケーションまたはサービスです。プロデューサーは、Kafka 内の特定のトピックにメッセージをプッシュします。

トピック:
トピックは、レコードが公開されるカテゴリまたはフィードの名前です。トピックはスケーラビリティと並列処理を考慮して分割されています。

パーティション:

  • 各トピックは 1 つ以上のパーティションに分割されます。
  • パーティションにより、Kafka はより多くのメッセージとサポートを処理できるようになります 並列処理。
  • 各パーティションには一意の ID があり、パーティションのサブセットを保存できます。 トピックのデータ。

ブローカー:

  • Kafka はブローカー (サーバー) のクラスターとして実行され、それぞれがデータを処理します 複数のトピックとパーティション用。
  • ブローカーはパーティションを保存および管理し、読み取りと書き込みを処理します 生産者と消費者からのリクエスト
  • 各ブローカーは一意の ID によって識別されます。

消費者:

コンシューマは、トピックからメッセージを読み取るアプリケーションまたはサービスです。
消費者はトピックをサブスクライブし、Kafka ブローカーからデータを取得します。

消費者グループ:

  • 消費者は消費者グループに組織されます。
  • パーティション内の各メッセージはグループ内の 1 つのコンシューマにのみ配信されるため、複数のコンシューマ間で負荷分散が可能になります。

動物園の飼育員:

  • ZooKeeper は、Kafka ブローカーを管理および調整し、ブローカー、トピック、パーティションを追跡します。
  • パーティションのリーダー選出の管理とクラスターの健全性の監視に役立ちます。

Kafka の使用例

  • リアルタイム分析: 企業は Kafka を使用して、金融取引分析などの監視システムのデータ ストリームをリアルタイムで処理および分析します。
  • ログ集約: Kafka は、処理、アラート、または保存のために複数のサービスまたはアプリケーションからのログを統合します。
  • データ パイプライン: Kafka は、異なるシステムまたはサービス (ETL パイプライン) 間で大量のデータを転送するためのバックボーンとして使用されます。
  • IoT アプリケーション: Kafka は IoT センサーからのデータ ストリームを処理できるため、リアルタイムの分析と応答が可能になります。
  • マイクロサービス通信: Kafka は、マイクロサービス アーキテクチャの信頼できるメッセージング プラットフォームとして機能し、非同期の分離された通信を可能にします。
  • リアルタイム車両追跡: 次の例は、Kafka を使用してリアルタイムで車両を追跡する方法を示しています。

リアルタイム シナリオで Kafka を使用する方法を示す Python を使用した例:

ライドシェアリングアプリの位置追跡。

簡単にするために、kafka-python ライブラリを使用して、プロデューサー (位置更新を送信するドライバーをシミュレートするため) とコンシューマー (位置更新を処理するサービスをシミュレートするため) の両方を作成します。

1. Kafka のセットアップ
Kafka がローカルで実行されていることを確認するか、クラウド プロバイダーを使用してください。 Kafka クイックスタート ガイドに従って、Kafka をローカルにダウンロードして実行できます。

2. Kafka Python ライブラリをインストールします
pip:
を使用して Kafka Python ライブラリをインストールできます。

pip install kafka-python
ログイン後にコピー
ログイン後にコピー

3. Python Kafka プロデューサー (ドライバーの場所の更新のシミュレーション)
プロデューサは、位置の更新を Kafka トピック (driver-location) に送信するドライバーをシミュレートします。

from kafka import KafkaProducer
import json
import time
import random

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data to JSON
)

def send_location_updates(driver_id):
    while True:
        # Simulating random GPS coordinates (latitude, longitude)
        location = {
            "driver_id": driver_id,
            "latitude": round(random.uniform(40.0, 41.0), 6),
            "longitude": round(random.uniform(-74.0, -73.0), 6),
            "timestamp": time.time()
        }
        # Send location data to Kafka
        producer.send('driver-location', location)
        print(f"Sent: {location}")
        time.sleep(5)  # Sleep for 5 seconds to simulate real-time updates

# Start sending updates for driver_id = 101
send_location_updates(driver_id=101)
ログイン後にコピー
ログイン後にコピー

4. Python Kafka Consumer (乗車マッチング サービスのシミュレーション)
コンシューマーは、ドライバーの位置トピックから位置の更新を読み取り、処理します。

from kafka import KafkaConsumer
import json

# Kafka Consumer
consumer = KafkaConsumer(
    'driver-location',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',  # Start from the earliest message
    enable_auto_commit=True,
    group_id='location-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))  # Deserialize data from JSON
)

def process_location_updates():
    print("Waiting for location updates...")
    for message in consumer:
        location = message.value
        driver_id = location['driver_id']
        latitude = location['latitude']
        longitude = location['longitude']
        timestamp = location['timestamp']
        print(f"Received location update for Driver {driver_id}: ({latitude}, {longitude}) at {timestamp}")

# Start consuming location updates
process_location_updates()
ログイン後にコピー

説明:

プロデューサー (位置情報の更新を送信するドライバー):

  • プロデューサは、driver_id、緯度、経度、タイムスタンプなどのフィールドを含む JSON オブジェクトを Kafka トピックの driver-location に送信します。
  • プロデューサーは、5 秒ごとに位置データを送信することで、リアルタイムの GPS 更新をシミュレートします。

消費者 (配車サービス):

  • コンシューマはドライバーの場所のトピックをサブスクライブし、更新をリッスンします。
  • 位置情報の更新が Kafka に公開されるたびに、コンシューマーはそれを処理して印刷し、このデータを使用してドライバーと乗客を照合するサービスをシミュレートします。

例の実行 (Windows マシンで実行しています):

  1. 動物園飼育員を開始
pip install kafka-python
ログイン後にコピー
ログイン後にコピー
  1. ローカルの Kafka サーバーを起動します。
from kafka import KafkaProducer
import json
import time
import random

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data to JSON
)

def send_location_updates(driver_id):
    while True:
        # Simulating random GPS coordinates (latitude, longitude)
        location = {
            "driver_id": driver_id,
            "latitude": round(random.uniform(40.0, 41.0), 6),
            "longitude": round(random.uniform(-74.0, -73.0), 6),
            "timestamp": time.time()
        }
        # Send location data to Kafka
        producer.send('driver-location', location)
        print(f"Sent: {location}")
        time.sleep(5)  # Sleep for 5 seconds to simulate real-time updates

# Start sending updates for driver_id = 101
send_location_updates(driver_id=101)
ログイン後にコピー
ログイン後にコピー

ここで、Python を使用して 2 つの別々のターミナル ウィンドウでプロデューサーとコンシューマーを実行します。

  1. プロデューサー スクリプトを実行して、位置の更新を送信するドライバーをシミュレートします。

  2. コンシューマ スクリプトを実行して、配車マッチング サービスが位置情報の更新をリアルタイムで処理していることを確認します。

結論
Apache Kafka は、リアルタイム データ ストリームを管理するための優れたプラットフォームを提供します。 Kafka と Python を組み合わせることで、開発者は強力なデータ パイプラインとリアルタイム分析ソリューションを構築できます。

車両追跡、IoT データ、リアルタイム ダッシュボードのいずれであっても、Python を使用した Kafka は拡張性が高く、さまざまなユースケースに適応できます。それでは、Kafka の実験を開始してください。そうすれば、現実世界のアプリケーションにおけるその可能性に驚かれるでしょう!

以上がPython を使用した Kafka の初心者ガイド: リアルタイム データ処理とアプリケーションの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットな記事タグ

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

HTMLを解析するために美しいスープを使用するにはどうすればよいですか? HTMLを解析するために美しいスープを使用するにはどうすればよいですか? Mar 10, 2025 pm 06:54 PM

HTMLを解析するために美しいスープを使用するにはどうすればよいですか?

Pythonでの画像フィルタリング Pythonでの画像フィルタリング Mar 03, 2025 am 09:44 AM

Pythonでの画像フィルタリング

Pythonを使用してテキストファイルのZIPF配布を見つける方法 Pythonを使用してテキストファイルのZIPF配布を見つける方法 Mar 05, 2025 am 09:58 AM

Pythonを使用してテキストファイルのZIPF配布を見つける方法

Pythonを使用してPDFドキュメントの操作方法 Pythonを使用してPDFドキュメントの操作方法 Mar 02, 2025 am 09:54 AM

Pythonを使用してPDFドキュメントの操作方法

DjangoアプリケーションでRedisを使用してキャッシュする方法 DjangoアプリケーションでRedisを使用してキャッシュする方法 Mar 02, 2025 am 10:10 AM

DjangoアプリケーションでRedisを使用してキャッシュする方法

TensorflowまたはPytorchで深い学習を実行する方法は? TensorflowまたはPytorchで深い学習を実行する方法は? Mar 10, 2025 pm 06:52 PM

TensorflowまたはPytorchで深い学習を実行する方法は?

Pythonオブジェクトのシリアル化と脱介入:パート1 Pythonオブジェクトのシリアル化と脱介入:パート1 Mar 08, 2025 am 09:39 AM

Pythonオブジェクトのシリアル化と脱介入:パート1

Pythonで独自のデータ構造を実装する方法 Pythonで独自のデータ構造を実装する方法 Mar 03, 2025 am 09:28 AM

Pythonで独自のデータ構造を実装する方法

See all articles