PHP-Nachrichtenwarteschlange Kafka-Nutzung

Guanhui
Freigeben: 2023-04-08 16:42:01
nach vorne
5349 Leute haben es durchsucht

PHP-Nachrichtenwarteschlange Kafka-Nutzung

Installieren Sie den Kafka-Dienst

Gehen Sie direkt zur offiziellen Kafka-Website und laden Sie die neueste Version herunter

wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
Nach dem Login kopieren

Entpacken Sie die Datei und geben Sie das Verzeichnis ein

tar -zxvf kafka_2.13-2.5.0.tgz
cd kafka_2.13-2.5.0
Nach dem Login kopieren

Starten Sie den Kafka-Dienst

Verwenden Sie das Skript im Installationspaket, um einen Zookeeper mit einem Knoten zu starten Instanz

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
Nach dem Login kopieren

Verwenden Sie kafka -server-start.sh Starten Sie den Kafka-Dienst

bin/kafka-server-start.sh config/server.properties
Nach dem Login kopieren

Thema erstellen

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Nach dem Login kopieren

Sehen Sie sich die Themenliste an und prüfen Sie, ob die Erstellung erfolgt ist ist erfolgreich

bin/kafka-topics.sh --list --zookeeper localhost:2181
$ test
Nach dem Login kopieren

Produzent, sende News

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
Nach dem Login kopieren

Das war's für den Service, als nächstes kommt PHP.

PHP-Erweiterung installieren

Die Installation von rdkafka hängt von librdkafka ab, also installieren Sie zuerst librdkafka

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make && make install
Nach dem Login kopieren

Installieren Sie die php-rdkafka-Erweiterung

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure --with-php-config=/usr/local/Cellar/php@7.2/7.2.24/bin/php-config  ## 这里根据自己的情况填写路径
make && make install
Nach dem Login kopieren

Fügen Sie

extension=rdkafka.so
Nach dem Login kopieren

zu php-ini hinzu, starten Sie php-fpm neu und Sie sollten die Erweiterung sehen können.

Erstellen Sie eine Produzentenklasse mit Kafka

<?php
class KafkaProducer
{
    public static $brokerList = &#39;127.0.0.1:9092&#39;;
    public static function send($message, $topic)
    {
        self::producer($message, $topic);
    }
    public static function producer($message, $topic = &#39;test&#39;)
    {
        $conf = new \RdKafka\Conf();
        $conf->set(&#39;metadata.broker.list&#39;, self::$brokerList);
        $producer = new \RdKafka\Producer($conf);
        $topic = $producer->newTopic($topic);
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));
        $producer->poll(0);
        $result = $producer->flush(10000);
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException(&#39;Was unable to flush, messages might be lost!&#39;);
        }
    }
}
Nach dem Login kopieren

Erstellen Sie eine Verbraucherklasse

<?php
class KafkaConsumer
{
    public static $brokerList = &#39;127.0.0.1:9092&#39;;
      public static function consumer()
    {
        $conf = new \RdKafka\Conf();
        $conf->set(&#39;group.id&#39;, &#39;test&#39;);
        $rk = new \RdKafka\Consumer($conf);
        $rk->addBrokers("127.0.0.1");
        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set(&#39;auto.commit.interval.ms&#39;, 100);
        $topicConf->set(&#39;offset.store.method&#39;, &#39;broker&#39;);
        $topicConf->set(&#39;auto.offset.reset&#39;, &#39;smallest&#39;);
        $topic = $rk->newTopic(&#39;test&#39;, $topicConf);
        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
        while (true) {
            $message = $topic->consume(0, 120*10000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    var_dump($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed out\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
    }
}
Nach dem Login kopieren

Problemzusammenfassung

1. Keine Java-Laufzeit vorhanden, Installation wird angefordert

Da Kafka Java-Umgebungsunterstützung erfordert, wird die Java-Umgebung installiert. Sie können zu javase-jdk14-downloads gehen, um Ihre eigene Version zum Herunterladen und Installieren auszuwählen

2. Erstellen Sie ein Thema und es wird angezeigt: Replikationsfaktor: 1 größer als verfügbare Broker: 0

Das bedeutet, dass es mindestens einen Broker gibt. Das heißt, es sind keine gültigen Broker verfügbar. Sie müssen sicherstellen, dass Ihr Kafka gestartet wurde

Empfohlenes Tutorial: „PHP-Tutorial

Das obige ist der detaillierte Inhalt vonPHP-Nachrichtenwarteschlange Kafka-Nutzung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:learnku.com
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage