PHP と Apache Kafka を使用してリアルタイム ストリーム処理を実装する方法

WBOY
リリース: 2023-06-28 12:02:02
オリジナル
1302 人が閲覧しました

Apache Kafka は、高スループット、低遅延の分散パブリッシュ/サブスクライブ メッセージング システムです。高周波数、大容量のデータ ストリームを処理するリアルタイム ストリーム処理システムのアーキテクチャで広く使用されています。この記事では、PHP と Apache Kafka を使用してリアルタイム ストリーム処理を実装する方法を紹介します。

  1. Apache Kafka のインストール

Apache Kafka の使用を開始する前に、まず Apache Kafka をインストールする必要があります。公式 Web サイトから Apache Kafka をダウンロードしてインストールすることも、オープンソースのインストール スクリプトを使用することもできます。ここでは、Apache Kafka が提供するバイナリ バージョンを使用します。

  1. Kafka プロデューサーの作成

次に、Kafka クラスターにデータをプッシュする Kafka プロデューサーを作成します。 PHP では、kafka-php 拡張機能を使用してこれを実現できます。

まず、kafka-php 拡張機能をダウンロードしてコンパイルする必要があります。詳しいインストール手順は、kafka-php の GitHub ページでご覧いただけます。インストールが完了したら、PHP コードで kafka-php 拡張機能を使用できるようになります。

次の例は、Kafka プロデューサを作成し、トピックにメッセージを送信する方法を示しています。

<?php
require_once('KafkaProducer.php');

$producer = new KafkaProducer('localhost:9092');
$producer->send([
    [
        'topic' => 'example-topic',
        'value' => 'Hello, Kafka!',
        'key' => 'key1'
    ]
]);
?>
ログイン後にコピー

上記のコードでは、最初に KafkaProducer オブジェクトを作成し、そのアドレスを指定します。カフカクラスタ。次に、send メソッドを通じてトピック (example-topic) にメッセージを送信しました。

送信されるメッセージは、メッセージの件名、内容、キーを含む配列です。キーを使用してメッセージをグループ化すると、Kafka クラスターが同じキーを持つメッセージを同じパーティションに分散できるようになります。

  1. Kafka コンシューマーの作成

次に、Kafka クラスターからのデータを消費する Kafka コンシューマーを作成します。同様に、PHP では、kafka-php 拡張機能を使用してこれを実現できます。

<?php
require_once('KafkaConsumer.php');

$consumer = new KafkaConsumer('localhost:9092', 'example-group', ['example-topic']);
$consumer->consume(function($message) {
    echo $message->payload . "
";
});
?>
ログイン後にコピー

上記のコードでは、まず KafkaConsumer オブジェクトを作成し、Kafka クラスターのアドレス、コンシューマー グループ (グループ) の名前、および消費されるトピックを指定します。次に、consumption メソッドを通じてデータの消費を開始します。

consume メソッドは、Kafka クラスターから受信したメッセージを処理するためのパラメーターとしてコールバック関数を受け入れます。コールバック関数では、メッセージのコンテンツ (ペイロード) にアクセスできます。

Kafka コンシューマの作成時にコンシューマ グループの名前を指定したことに注意してください。コンシューマ グループは Kafka の重要な概念であり、メッセージをパーティションに分散するために使用されます。同じコンシューマ グループ名を持つコンシューマは同じトピックを一緒に消費し、Kafka はそれらのコンシューマ間でメッセージを自動的に配布します。コンシューマ グループの目的は、各メッセージが 1 回だけ消費されるようにすることです。

  1. リアルタイム ストリーム処理

ここで、上記の 2 つの例を組み合わせて、リアルタイム ストリーム処理を実現します。 Kafka プロデューサーを作成し、トピックに定期的にメッセージを送信できます。次に、コールバック関数でトピックから受信したメッセージを処理する Kafka コンシューマーを作成できます。

次は、リアルタイム ストリーム処理を示す例です:

<?php
require_once('KafkaProducer.php');
require_once('KafkaConsumer.php');

$producer = new KafkaProducer('localhost:9092');
$consumer = new KafkaConsumer('localhost:9092', 'example-group', ['example-topic']);

while (true) {
    $producer->send([
        [
            'topic' => 'example-topic',
            'value' => rand(0, 10),
            'key' => 'key1'
        ]
    ]);

    $consumer->consume(function($message) {
        $value = $message->payload;
        echo "Received $value
";
    });

    sleep(1);
}
?>
ログイン後にコピー

上記のコードでは、最初に Kafka プロデューサと Kafka コンシューマを作成します。次に、トピックに定期的に乱数を送信し、トピックからのメッセージを消費するループに入ります。コンシューマ コールバック関数では、受信した値をコンソールに出力します。

ここで説明するのは、単純なリアルタイム ストリーム処理プロセスです。実際には、リアルタイム ストリーム処理システムはより複雑で、複数のプロデューサーとコンシューマーが存在し、複数のトピックとパーティションが存在する場合があります。いずれにしても、PHP と Apache Kafka を使用すると、リアルタイム ストリーム処理システムを簡単に構築し、高頻度で大容量のデータ ストリームを処理できます。

以上がPHP と Apache Kafka を使用してリアルタイム ストリーム処理を実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート
私たちについて 免責事項 Sitemap
PHP中国語ウェブサイト:福祉オンライン PHP トレーニング,PHP 学習者の迅速な成長を支援します!