kafka는 단지 작은 링크일 뿐입니다. 데이터 전송 및 전송에 자주 사용됩니다. Kafka의 공식 예에는 실제로 관련 PHP 구현 버전이 없습니다. 현재 인터넷에 유통되고 있는 Kafka 관련 PHP 라이브러리는 모두 프로그래밍 매니아가 직접 작성한 클래스 라이브러리이므로 통일된 인터페이스 표준은 없을 것입니다.
다음은 관련 kafka PHP 확장 라이브러리의 사용을 보여주기 위해 특정 클래스 라이브러리를 예로 사용합니다. 여러 kafka php 라이브러리를 종합적으로 비교한 후 Su Nan 삼촌은 다음 오픈 소스 라이브러리인 nmred/kafka-php가 더 간단하고 편리하다고 느꼈습니다.
작곡가의 기본적인 사용 문제는 저의 작곡가 관련 글을 확인하시면 됩니다. https://newsn.net/tag/composer/.
composer require "nmred/kafka-php" -vvv
물론 미러를 사용하여 다운로드 속도를 높일 수도 있습니다.
{ "config": { "secure-http": false, "preferred-install": "dist", "sort-packages": true }, "repositories": { "packagist": { "type": "composer", "url": "https://packagist.phpcomposer.com" } }, "require": { "nmred/kafka-php": "v0.2.0.7" } }
로컬 포트를 9092로 선택했고, 토픽은 test1이고, 내 로컬 카프카 버전이 0.11.0.0인지 확인합니다. 이것들은 모두 코드에서 사용됩니다.
<?phprequire 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// // Create the logger// $logger = new Logger('my_logger');// // Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer(function() { return array( array( 'topic' => 'test1', //注意对应topic 'key' => 'testkey', 'value' => 'test....message.', ), );});// $producer->setLogger($logger);$producer->success(function($result) { var_dump($result);});$producer->error(function($errorCode) { var_dump($errorCode);});$producer->send(true);
<?phprequire 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger('my_logger');// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer();// $producer->setLogger($logger);for($i = 0; $i < 100; $i++) { $result = $producer->send(array( array( 'topic' => 'test1', 'value' => 'test1....message.', 'key' => 'key'.$i, ), )); var_dump($result);}
이 두 프로듀서의 코드는 다음 셸 명령으로 받을 수 있습니다.
kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning
require 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger('my_logger');// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ConsumerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setGroupId('test');$config->setBrokerVersion('0.11.0.0');$config->setTopics(array('test1'));//$config->setOffsetReset('earliest');$consumer = new \Kafka\Consumer();//$consumer->setLogger($logger);$consumer->start(function($topic, $part, $message) { var_dump($message);});
이 소비자 코드는 다음 쉘 명령을 통해 데이터를 보낼 수 있습니다.
kafka-console-producer --broker-list localhost:9092 --topic test1
이 소비자 코드는 웹 페이지에서도 실행될 수 있다는 점은 특별히 설명할 가치가 있습니다. 페이지에 관련 데이터가 실시간으로 표시됩니다. PHP 측은 무한히 긴 끝없는 상태로 추정됩니다.
물론 모든 사람이 이러한 실험을 수행하려면 사육사 및 카프카를 활성화해야 한다는 것을 기억해야 합니다. 관련 Kafka 설치 문제를 보려면 여기를 클릭하십시오. https://newsn.net/tag/kafka/ .
이 글에 관련된 클래스 라이브러리 주소는 https://github.com/weiboad/kafka-php입니다. 이 오픈소스 클래스 라이브러리는 중국 작품인 것 같아서 중국 문서도 있습니다. 여기를 참조하세요. https://github.com/weiboad/kafka-php/blob/master/README_CH.md.
관련 권장 사항:
위 내용은 PHP는 Kafka 구현 코드로 데이터를 보냅니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!