PHP sends data to kafka implementation code

小云云
Release: 2023-03-20 19:50:02
Original
4951 people have browsed it

kafka is just a small link. Often used for sending and transferring data. In the official example of kafka, there is actually no relevant implementation version of php. The Kafka-related PHP libraries currently circulating on the Internet are all class libraries written by programming enthusiasts, so there will definitely not be a unified interface standard.

The following takes a certain class library as an example to show the use of the relevant kafka PHP extension library. After comprehensively comparing several Kafka PHP libraries, Uncle Su Nan feels that the following open source library, nmred/kafka-php, is simpler and more convenient.

First, install the composer class library nmred/kafka-php.

For basic usage issues of composer, you can check out my composer-related articles. https://newsn.net/tag/composer/.

composer require "nmred/kafka-php" -vvv
Copy after login

Of course, you can also use mirrors to speed up downloads.

{
  "config": {
      "secure-http": false,
      "preferred-install": "dist",
      "sort-packages": true
  },
  "repositories": {
      "packagist": {
          "type": "composer",
          "url": "https://packagist.phpcomposer.com"
      }
  },
  "require": {
    "nmred/kafka-php": "v0.2.0.7"
  }
}
Copy after login

Determine the port and topic, check the kafka version number

I chose the local port as 9092, the topic is test1, and check that my local kafka version is 0.11.0.0. These are all used in the code.

php如何发送数据到kafka - kafka_version.png

php如何发送数据到kafka - receive.png

##Producer code (asynchronous)

<?phprequire &#39;vendor/autoload.php&#39;;date_default_timezone_set(&#39;PRC&#39;);// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// // Create the logger// $logger = new Logger(&#39;my_logger&#39;);// // Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer(function() {
    return array(
        array(
            'topic' => 'test1',     //注意对应topic
            'key' => 'testkey',
            'value' => 'test....message.',
            ),
    );});// $producer->setLogger($logger);$producer->success(function($result) {
    var_dump($result);});$producer->error(function($errorCode) {
    var_dump($errorCode);});$producer->send(true);
Copy after login

Producer code (synchronous)

<?phprequire &#39;vendor/autoload.php&#39;;date_default_timezone_set(&#39;PRC&#39;);// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger(&#39;my_logger&#39;);// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ProducerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.11.0.0');$config->setRequiredAck(1);$config->setIsAsyn(false);$config->setProduceInterval(500);$producer = new \Kafka\Producer();// $producer->setLogger($logger);for($i = 0; $i < 100; $i++) {
  $result = $producer->send(array(
    array(
        'topic' => 'test1',
        'value' => 'test1....message.',
        'key' => 'key'.$i,
    ),
  ));
  var_dump($result);}
Copy after login

The codes of these two producers can be received using the following shell commands.

kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning
Copy after login
Consumer code

require 'vendor/autoload.php';date_default_timezone_set('PRC');// use Monolog\Logger;// use Monolog\Handler\StdoutHandler;// Create the logger// $logger = new Logger('my_logger');// Now add some handlers// $logger->pushHandler(new StdoutHandler());$config = \Kafka\ConsumerConfig::getInstance();$config->setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setGroupId('test');$config->setBrokerVersion('0.11.0.0');$config->setTopics(array('test1'));//$config->setOffsetReset('earliest');$consumer = new \Kafka\Consumer();//$consumer->setLogger($logger);$consumer->start(function($topic, $part, $message) {
    var_dump($message);});
Copy after login

This consumer code can send data through the following shell command.

kafka-console-producer --broker-list localhost:9092 --topic test1
Copy after login
It is worth mentioning that this consumer code can also be executed on the web page. The page will display relevant data in real time. It is estimated that the PHP side is an infinite state.

Of course, everyone must remember to enable zookeeper and kafka in order to do these experiments. For related Kafka installation issues, please click here to view. https://newsn.net/tag/kafka/ .

The class library address involved in this article is: https://github.com/weiboad/kafka-php. This open source class library seems to be a Chinese work, so there is a Chinese document. See here. https://github.com/weiboad/kafka-php/blob/master/README_CH.md.

Related recommendations:

Example sharing of PHP extension kafka under Linux

Kafka client written in PHP

Usage of kafka assembly and Kafka-PHP extension

The above is the detailed content of PHP sends data to kafka implementation code. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!