kafka只是個小小的紐帶。經常用於資料的發送及轉移。在kafka官方的例子中,其實並沒有php的相關實作版本。現在網路上流傳的kafka的相關php函式庫,都是些程式愛好者自己寫的類別庫,所以就肯定不會有太統一的介面標準了。
下面以某個類別庫為例,展示相關的kafka的php擴充庫使用。綜合比較了幾個kafka的php函式庫,蘇南叔公覺得下面的這個開源類別函式庫,nmred/kafka-php ,比較簡單方便一些。
composer的基本使用問題,大家可以查看我的composer相關文章。 https://newsn.net/tag/composer/ 。
composer require "nmred/kafka-php" -vvv
當然,你也可以用鏡像加速下載。
{ "config": { "secure-http": false, "preferred-install": "dist", "sort-packages": true }, "repositories": { "packagist": { "type": "composer", "url": "https://packagist.phpcomposer.com" } }, "require": { "nmred/kafka-php": "v0.2.0.7" } }
我選擇本地的連接埠是9092,topic是test1,同時查看我本地的kafka版本是0.11.0.0。這些在程式碼中都是要用到的。
<?phprequire 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// // Create the logger// $logger = new Logger('my_logger');// // Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer(function() { return array( array( 'topic' => 'test1', //注意对应topic 'key' => 'testkey', 'value' => 'test....message.', ), );});// $producer->setLogger($logger);$producer->success(function($result) { var_dump($result);});$producer->error(function($errorCode) { var_dump($errorCode);});$producer->send(true);
<?phprequire 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger('my_logger');// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer();// $producer->setLogger($logger);for($i = 0; $i < 100; $i++) { $result = $producer->send(array( array( 'topic' => 'test1', 'value' => 'test1....message.', 'key' => 'key'.$i, ), )); var_dump($result);}
這兩個生產者的程式碼,可以用下列shell指令接收。
kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning
require 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger('my_logger');// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ConsumerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setGroupId('test');$config->setBrokerVersion('0.11.0.0');$config->setTopics(array('test1'));//$config->setOffsetReset('earliest');$consumer = new \Kafka\Consumer();//$consumer->setLogger($logger);$consumer->start(function($topic, $part, $message) { var_dump($message);});
這個消費者程式碼,可以透過下面的shell指令傳送資料。
kafka-console-producer --broker-list localhost:9092 --topic test1
值得特別說明的是,這個消費者的程式碼,在網頁裡面執行也是可以的。頁面會即時顯示相關數據。估計php端是個無限長的endless狀態。
當然,大家要記得開啟zookeeper和kafka,才能做這些實驗。相關kafka的安裝問題,請點這裡查看。 https://newsn.net/tag/kafka/ 。
本文涉及的類別庫位址是:https://github.com/weiboad/kafka-php 。這個開源類別庫,似乎是國人作品,所以有個中文文檔。見這裡。 https://github.com/weiboad/kafka-php/blob/master/README_CH.md 。
相關推薦:
以上是php傳送資料到kafka實作程式碼的詳細內容。更多資訊請關注PHP中文網其他相關文章!