Kafka Stream は、ストリーム コンピューティング エンジンとして、リアルタイム データを迅速に処理し、すぐに使える分散処理機能を提供します。人気の開発言語である PHP は、その優れた言語機能と拡張ライブラリを使用して、Kafka Stream データ処理を実装することもできます。
この記事では、PHP を使用して Kafka ストリームのリアルタイム データ処理を開発する方法を紹介し、例を使用してオブザーバー モードによって生成されたリアルタイム データを PHP を使用して分析する方法を示します。
Kafka Stream は、リアルタイム データを確実に処理し、すぐに使える分散型データを提供できる、高速で安定したストリーム コンピューティング エンジンです。処理能力。 Kafka Stream は、Kafka トピック内のメッセージを消費し、処理のためにアプリケーションに送信し、処理された結果を Kafka トピックに送り返すことにより、効率的かつ柔軟なデータ処理方法です。
PHP では、Kafka Stream によって公式に提供される Kafka-PHP ライブラリを通じて、PHP アプリケーションを Kafka Stream と簡単に統合できます。 。 Kafka-PHP ライブラリでサポートされている Kafka Stream のバージョンは次のとおりです:
Kafka-PHP ライブラリは、次のコア機能を提供します。 :
さらに、Kafka-PHP ライブラリは PHP の Swoole 拡張機能のサポートも提供しており、Swoole 拡張機能を使用することで PHP アプリケーションのパフォーマンスをさらに向上させることができます。
Observer パターンは、オブジェクト間の 1 対多の依存関係を定義する動作設計パターンです。それに依存するオブジェクトは通知され、自動的に更新されます。オブザーバー パターンは、イベント監視、UI プログラミング、その他の分野で広く使用されており、効率的なメッセージ配信と処理を実現できます。
以下では、サンプル コードを使用して、PHP を使用して Kafka Stream のリアルタイム データ処理を開発し、適用する方法を示します。データ分析用のオブザーバー モード。
4.1 Kafka プロデューサーの実装
まず、Kafka トピックにメッセージを送信するプロデューサーを作成する必要があります。以下は、簡単な Kafka プロデューサーのサンプル コードです。
<?php require_once __DIR__ . '/vendor/autoload.php'; use RdKafkaConf; use RdKafkaProducer; use RdKafkaProducerTopic; $conf = new Conf(); $conf->set('metadata.broker.list', 'kafka:9092'); $producer = new Producer($conf); $topic = $producer->newTopic('topic1'); for ($i = 0; $i < 10; $i++) { $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i"); } ?>
上記のコードでは、RdKafka 拡張ライブラリによって提供されるプロデューサー クラスを使用して、Kafka プロデューサを実装し、トピック内の 'topic1' という名前の Kafka にメッセージを送信します。 Kafka プロデューサーを実装するときは、Kafka クラスターが正しく接続できるように、Kafka クラスターの接続構成のセットアップに注意する必要があります。
4.2 Kafka コンシューマの実装
次に、Kafka トピックからのデータを消費する Kafka コンシューマを作成する必要があります。以下は、簡単な Kafka コンシューマ サンプル コードです。
<?php require_once __DIR__ . '/vendor/autoload.php'; use RdKafkaConf; use RdKafkaConsumer; use RdKafkaTopicPartition; $conf = new Conf(); $conf->set('metadata.broker.list', 'kafka:9092'); $consumer = new Consumer($conf); $consumer->addBrokers('kafka:9092'); $topic = $consumer->newTopic('topic1'); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 1000); if ($message === null) { continue; } if ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) { echo "Received message: {$message->payload} "; } } $consumer->close(); ?>
上記のコードでは、RdKafka 拡張ライブラリによって提供される Consumer クラスを使用して、Kafka コンシューマを実装し、「topic1」という名前の Kafka トピックからデータを消費します。データをコンソールに出力します。 Kafka コンシューマを実装するときは、消費トピックと消費を開始するオフセットを設定する必要があることに注意してください。
4.3 オブザーバー パターンの実装
これで、Kafka トピックからデータを利用できるようになりましたが、オブザーバー パターンを使用してデータを分析するにはどうすればよいでしょうか?以下は、単純なオブザーバー パターンのサンプル コードです。
<?php require_once __DIR__ . '/vendor/autoload.php'; use SplObserver; use SplSubject; class Producer implements SplSubject { private array $observers = []; public function attach(SplObserver $observer):void { array_push($this->observers, $observer); } public function detach(SplObserver $observer):void { if (($key = array_search($observer, $this->observers, true)) !== false) { array_splice($this->observers, $key, 1); } } public function notify():void { foreach ($this->observers as $observer) { $observer->update($this); } } public function produce(string $message):void { echo "Producing message: {$message} "; $this->notify(); } } class Consumer implements SplObserver { public function update(SplSubject $subject):void { echo "Consuming message: {$subject} "; } } $producer = new Producer(); $producer->attach(new Consumer()); $producer->produce('Message 1'); ?>
上記のコードでは、Producer という名前のメイン クラスを定義し、SplSubject インターフェイスを実装し、オブザーバー管理メソッドのアタッチ、デタッチ、通知、生成を提供します。また、Consumer という名前のオブザーバー クラスを定義し、SplObserver インターフェイスを実装し、メッセージを処理するための update メソッドを提供しました。最後に、Producer インスタンスを作成し、Consumer インスタンスをオブザーバーとしてアタッチし、Produce メソッドを 1 回実行して、Consumer の Update メソッドをトリガーしました。
4.4 Kafka Stream のオブザーバー モード データ処理を実装する
最後に、前の 3 つの手順のコードを組み合わせて、Kafka Stream のオブザーバー モード データ処理を実装します。以下は、簡単な Kafka Stream データ処理のサンプル コードです。
<?php require_once __DIR__ . '/vendor/autoload.php'; use RdKafkaConf; use RdKafkaConsumer; use RdKafkaProducer; use RdKafkaTopicPartition; use SplSubject; use SplObserver; class KafkaStream implements SplSubject { private array $observers; private Conf $conf; private Producer $producer; private Consumer $consumer; public function __construct(string $bootstrap_servers) { $this->conf = new Conf(); $this->conf->set('metadata.broker.list', $bootstrap_servers); $this->producer = new Producer($this->conf); $this->consumer = new Consumer($this->conf); $this->observers = []; } public function attach(SplObserver $observer):void { array_push($this->observers, $observer); } public function detach(SplObserver $observer):void { if (($key = array_search($observer, $this->observers, true)) !== false) { array_splice($this->observers, $key, 1); } } public function notify():void { foreach ($this->observers as $observer) { $observer->update($this); } } public function produce(string $message, string $topic):void { echo "Producing message: {$message} "; $this->producer->newTopic($topic)->produce(RD_KAFKA_PARTITION_UA, 0, $message); $this->notify(); } public function consume(string $topic):void { $topic_partition = new TopicPartition($topic, 0); $this->consumer->assign([$topic_partition]); $this->consumer->seek($topic_partition, 0); while (true) { $message = $this->consumer->consume(0, 1000); if ($message === null) { continue; } if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) { echo "Error: {$message->errstr()}, exiting. "; break; } echo "Consuming message: {$message->payload} "; } $this->consumer->close(); } } class Consumer implements SplObserver { public function update(SplSubject $subject):void { echo "Processing message: {$subject} "; } } $bootstrap_servers = 'kafka:9092'; $kafka_stream = new KafkaStream($bootstrap_servers); $kafka_stream->attach(new Consumer()); $kafka_stream->produce('Message 1', 'topic1'); $kafka_stream->consume('topic1'); ?>
上記のコードでは、KafkaStream という名前のクラスを定義し、SplSubject インターフェイスを実装し、Kafka Stream 処理のコア メソッドであるProduce および Consumer を提供します。オブザーバー管理メソッドとして、アタッチ、デタッチ、および通知が行われます。また、Consumer という名前のオブザーバー クラスを定義し、SplObserver インターフェイスを実装し、メッセージを処理するための update メソッドを提供しました。最後に、KafkaStream インスタンスを作成し、Consumer インスタンスをオブザーバーとしてアタッチし、Produce メソッドを 1 回実行してメッセージを生成し、Consumer メソッドでメッセージを消費して処理しました。
この記事では、PHP を使用して Kafka Stream のリアルタイム データ処理を開発する方法を紹介し、オブザーバー パターンを使用してリアルタイム データを分析する方法を示します。 Kafka Stream と Observer パターンは、大規模なリアルタイム データを迅速に処理し、効率的なメッセージ配信と処理を実現するのに役立つ強力なツールの組み合わせです。
以上がPHP はオープンソースの Kafka Stream リアルタイム データ処理を実装しますの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。