Home > Backend Development > PHP Tutorial > PHP message queue Kafka usage

PHP message queue Kafka usage

Guanhui
Release: 2023-04-08 16:42:01
forward
5407 people have browsed it

PHP message queue Kafka usage

Install Kafka service

Go directly to the kafka official website and download the latest

wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
Copy after login

Unzip and enter the directory

tar -zxvf kafka_2.13-2.5.0.tgz
cd kafka_2.13-2.5.0
Copy after login

Start the Kafka service

Use the script in the installation package to start a single-node Zookeeper instance

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
Copy after login

Use kafka -server-start.sh Start kafka service

bin/kafka-server-start.sh config/server.properties
Copy after login

Create topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Copy after login

View the topic list and check whether the creation is successful

bin/kafka-topics.sh --list --zookeeper localhost:2181
$ test
Copy after login

Producer, send News

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
Copy after login

That’s pretty much it for the service, the next thing is php.

Install PHP extension

rdkafka installation depends on librdkafka, so install librdkafka first

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make && make install
Copy after login

Install php-rdkafka extension

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure --with-php-config=/usr/local/Cellar/php@7.2/7.2.24/bin/php-config  ## 这里根据自己的情况填写路径
make && make install
Copy after login

Add

extension=rdkafka.so
Copy after login

to php-ini, restart php-fpm, and you should be able to see the extension.

Using Kafka

Create a producer class

<?php
class KafkaProducer
{
    public static $brokerList = &#39;127.0.0.1:9092&#39;;
    public static function send($message, $topic)
    {
        self::producer($message, $topic);
    }
    public static function producer($message, $topic = &#39;test&#39;)
    {
        $conf = new \RdKafka\Conf();
        $conf->set(&#39;metadata.broker.list&#39;, self::$brokerList);
        $producer = new \RdKafka\Producer($conf);
        $topic = $producer->newTopic($topic);
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));
        $producer->poll(0);
        $result = $producer->flush(10000);
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException(&#39;Was unable to flush, messages might be lost!&#39;);
        }
    }
}
Copy after login

Create a consumer class

<?php
class KafkaConsumer
{
    public static $brokerList = &#39;127.0.0.1:9092&#39;;
      public static function consumer()
    {
        $conf = new \RdKafka\Conf();
        $conf->set(&#39;group.id&#39;, &#39;test&#39;);
        $rk = new \RdKafka\Consumer($conf);
        $rk->addBrokers("127.0.0.1");
        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set(&#39;auto.commit.interval.ms&#39;, 100);
        $topicConf->set(&#39;offset.store.method&#39;, &#39;broker&#39;);
        $topicConf->set(&#39;auto.offset.reset&#39;, &#39;smallest&#39;);
        $topic = $rk->newTopic(&#39;test&#39;, $topicConf);
        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
        while (true) {
            $message = $topic->consume(0, 120*10000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    var_dump($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed out\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
    }
}
Copy after login

Problem summary

1. No Java runtime present, requesting install

Because kafka requires java environment support, the java environment is installed. You can go to javase-jdk14-downloads and choose your own version to download and install.

2. Create topic. Replication factor: 1 larger than available brokers: 0

means there is at least one broker. Also That is to say, there are no valid brokers available. You have to make sure that your kafka has been started

Recommended tutorial: "PHP Tutorial"

The above is the detailed content of PHP message queue Kafka usage. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:learnku.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template