PHP はデータを Kafka 実装コードに送信します

小云云
リリース: 2023-03-20 19:50:02
オリジナル
5009 人が閲覧しました

kafka は単なる小さなリンクです。データの送信や転送によく使用されます。 kafka の公式の例では、実際には、関連する php の実装バージョンはありません。現在インターネット上で流通している Kafka 関連の PHP ライブラリはすべて、プログラミング愛好家自身が作成したクラス ライブラリであるため、統一されたインターフェイス標準は存在しません。

以下では、関連する kafka PHP 拡張ライブラリの使用方法を示すために、例として特定のクラス ライブラリを取り上げます。いくつかの kafka php ライブラリを包括的に比較した結果、Su Nan おじさんは、次のオープンソース ライブラリ nmred/kafka-php の方がシンプルで便利だと感じています。

まず、composer クラス ライブラリ nmred/kafka-php をインストールします。

composer の基本的な使用法の問題については、composer 関連の記事を参照してください。 https://newsn.net/tag/composer/。

composer require "nmred/kafka-php" -vvv
ログイン後にコピー

もちろん、ミラーを使用してダウンロードを高速化することもできます。

{
  "config": {
      "secure-http": false,
      "preferred-install": "dist",
      "sort-packages": true
  },
  "repositories": {
      "packagist": {
          "type": "composer",
          "url": "https://packagist.phpcomposer.com"
      }
  },
  "require": {
    "nmred/kafka-php": "v0.2.0.7"
  }
}
ログイン後にコピー

ポートとトピックを決定し、kafka のバージョン番号を確認します

ローカル ポートを 9092 に選択し、トピックは test1 で、ローカルの kafka バージョンが 0.11.0.0 であることを確認します。これらはすべてコード内で使用されます。

php如何发送数据到kafka - kafka_version.png

php如何发送数据到kafka - receive.png

プロデューサーコード(非同期)

<?phprequire &#39;vendor/autoload.php&#39;;date_default_timezone_set(&#39;PRC&#39;);// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// // Create the logger// $logger = new Logger(&#39;my_logger&#39;);// // Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer(function() {
    return array(
        array(
            'topic' => 'test1',     //注意对应topic
            'key' => 'testkey',
            'value' => 'test....message.',
            ),
    );});// $producer->setLogger($logger);$producer->success(function($result) {
    var_dump($result);});$producer->error(function($errorCode) {
    var_dump($errorCode);});$producer->send(true);
ログイン後にコピー

プロデューサーコード(同期)

<?phprequire &#39;vendor/autoload.php&#39;;date_default_timezone_set(&#39;PRC&#39;);// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger(&#39;my_logger&#39;);// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer();// $producer->setLogger($logger);for($i = 0; $i < 100; $i++) {
  $result = $producer->send(array(
    array(
        'topic' => 'test1',
        'value' => 'test1....message.',
        'key' => 'key'.$i,
    ),
  ));
  var_dump($result);}
ログイン後にコピー

これら2つのプロデューサーのコードは、以下のシェルコマンドで受け取ることができます。

kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning
ログイン後にコピー

コンシューマ コード

require 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger('my_logger');// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ConsumerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setGroupId('test');$config->setBrokerVersion('0.11.0.0');$config->setTopics(array('test1'));//$config->setOffsetReset('earliest');$consumer = new \Kafka\Consumer();//$consumer->setLogger($logger);$consumer->start(function($topic, $part, $message) {
    var_dump($message);});
ログイン後にコピー

このコンシューマ コードは、次のシェル コマンドを通じてデータを送信できます。

kafka-console-producer --broker-list localhost:9092 --topic test1
ログイン後にコピー

このコンシューマ コードが Web ページ上でも実行できることは特別な説明に値します。ページには関連データがリアルタイムで表示されます。 PHP側は無限状態になっていると推測されます。

もちろん、これらの実験を行うには、Zookeeper と Kafka を有効にすることを忘れないでください。 Kafka のインストールに関連する問題については、ここをクリックして参照してください。 https://newsn.net/tag/kafka/ 。

この記事に関係するクラス ライブラリのアドレスは、https://github.com/weiboad/kafka-php です。このオープンソースのクラスライブラリは中国の作品らしく、中国語のドキュメントがあります。ここを参照してください。 https://github.com/weiboad/kafka-php/blob/master/README_CH.md。

関連する推奨事項:

Linux での php 拡張機能 kafka の共有例

PHP で書かれた Kafka クライアント

kafka アセンブリと Kafka-PHP 拡張機能の使用

以上がPHP はデータを Kafka 実装コードに送信しますの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート