Home > Backend Development > PHP Tutorial > Teach you how to quickly install php+kafka

Teach you how to quickly install php+kafka

慕斯
Release: 2023-04-10 09:52:01
forward
3873 people have browsed it

We have learned so much about PHP. Today I will teach you how to quickly install PHP kafka. If you don’t know how to do it, then follow this article and continue learning

1. Install java and set relevant environment variables

> wget https://download.java.net/openjdk/jdk7u75/ri/openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz
> tar zxvf openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz
> mv java-se-7u75-ri/ /opt/
> export JAVA_HOME=/opt/java-se-7u75-ri
> export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin
> export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar

#验证安装
> java -verison
openjdk version "1.7.0_75"
OpenJDK Runtime Environment (build 1.7.0_75-b13)
OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)
Copy after login

2. Install kafka, here we take version 0.10.2 as an example

> wget http://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
> tar zxvf kafka_2.11-0.10.2.0.tgz
> mv kafka_2.11-0.10.2.0/ /opt/kafka
> cd /opt/kafka

#启动zookeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

#启动kafka
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

#尝试创建一个topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

#生产者写入消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

#消费者消费消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
Copy after login

3. Install the C operation library of kafka

> wget https://github.com/edenhill/librdkafka/archive/v1.3.0.tar.gz
> tar zxvf v1.3.0.tar.gz
> cd librdkafka-1.3.0
> ./configure
> make && make install
Copy after login

4. Install the kafka extension of php. Here select the php-rdkafka extension https://github.com/arnaud-lb/php-rdkafka

> wget https://github.com/arnaud-lb/php-rdkafka/archive/4.0.2.tar.gz
> tar 4.0.2.tar.gz
> cd php-rdkafka-4.0.2
> /opt/php7/bin/phpize
> ./configure --with-php-config=/opt/php7/bin/php-config
> make && make install
Copy after login

Modify php.ini and add extension=rdkafka.so

5. Install the IDE code prompt file of rdkafka

> composer create-project kwn/php-rdkafka-stubs php-rdkafka-stubs
Copy after login

Take phpstrom as an example, in the External of your project Right-click on Libraries, select Configure PHP Include Paths, and add the path just now.

6. Write php test code

producer:

<?php
$conf = new RdKafka\Conf();
$conf->set(&#39;log_level&#39;, LOG_ERR);
$conf->set(&#39;debug&#39;, &#39;admin&#39;);
$conf->set(&#39;metadata.broker.list&#39;, &#39;localhost:9092&#39;);

//If you need to produce exactly once and want to keep the original produce order, uncomment the line below
//$conf->set(&#39;enable.idempotence&#39;, &#39;true&#39;);

$producer = new RdKafka\Producer($conf);

$topic = $producer->newTopic("test2");

for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    $producer->poll(0);
}

for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}

if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
    throw new \RuntimeException(&#39;Was unable to flush, messages might be lost!&#39;);
}
Copy after login

low-level consumer:

<?php

$conf = new RdKafka\Conf();
$conf->set(&#39;log_level&#39;, LOG_ERR);
$conf->set(&#39;debug&#39;, &#39;admin&#39;);

// Set the group id. This is required when storing offsets on the broker
$conf->set(&#39;group.id&#39;, &#39;myConsumerGroup&#39;);

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$topicConf = new RdKafka\TopicConf();
$topicConf->set(&#39;auto.commit.interval.ms&#39;, 100);

// Set the offset store method to &#39;file&#39;
$topicConf->set(&#39;offset.store.method&#39;, &#39;broker&#39;);

// Alternatively, set the offset store method to &#39;none&#39;
// $topicConf->set(&#39;offset.store.method&#39;, &#39;none&#39;);

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// &#39;smallest&#39;: start from the beginning
$topicConf->set(&#39;auto.offset.reset&#39;, &#39;smallest&#39;);

$topic = $rk->newTopic("test2", $topicConf);

// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume(0, 10000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            print_r($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}
Copy after login

high-level consumer:

<?php

$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;

         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
             echo "Revoke: ";
             var_dump($partitions);
             $kafka->assign(NULL);
             break;

         default:
            throw new \Exception($err);
    }
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set(&#39;group.id&#39;, &#39;myConsumerGroup&#39;);

// Initial list of Kafka brokers
$conf->set(&#39;metadata.broker.list&#39;, &#39;127.0.0.1&#39;);

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// &#39;smallest&#39;: start from the beginning
$conf->set(&#39;auto.offset.reset&#39;, &#39;smallest&#39;);

$consumer = new RdKafka\KafkaConsumer($conf);

// Subscribe to topic &#39;test2&#39;
$consumer->subscribe([&#39;test2&#39;]);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            print_r($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
    sleep(2);
}
Copy after login

Recommended study: "PHP Video Tutorial

The above is the detailed content of Teach you how to quickly install php+kafka. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:csdn.net
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template