Maison > développement back-end > tutoriel php > Utilisation de Kafka dans la file d'attente de messages PHP

Utilisation de Kafka dans la file d'attente de messages PHP

Guanhui
Libérer: 2023-04-08 16:42:01
avant
5397 Les gens l'ont consulté

Utilisation de Kafka dans la file d'attente de messages PHP

Installez le service Kafka

Allez directement sur le site officiel de Kafka et téléchargez la dernière

wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
Copier après la connexion

Décompressez et entrez dans le répertoire

tar -zxvf kafka_2.13-2.5.0.tgz
cd kafka_2.13-2.5.0
Copier après la connexion

Démarrez le service Kafka

Utilisez le script du package d'installation pour démarrer un Zookeeper à nœud unique instance

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
Copier après la connexion

Utilisez kafka -server-start.sh Démarrez le service kafka

bin/kafka-server-start.sh config/server.properties
Copier après la connexion

Créer un sujet

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Copier après la connexion

Affichez la liste des sujets et vérifiez si la création est réussi

bin/kafka-topics.sh --list --zookeeper localhost:2181
$ test
Copier après la connexion

Producteur, envoyer des News

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
Copier après la connexion

C'est tout pour le service, la prochaine chose c'est php.

Installer l'extension PHP

L'installation de rdkafka dépend de librdkafka, alors installez d'abord librdkafka

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make && make install
Copier après la connexion

Installez l'extension php-rdkafka

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure --with-php-config=/usr/local/Cellar/php@7.2/7.2.24/bin/php-config  ## 这里根据自己的情况填写路径
make && make install
Copier après la connexion

Ajoutez

extension=rdkafka.so
Copier après la connexion

à php-ini, redémarrez php-fpm et vous devriez pouvoir voir l'extension.

Créer une classe de producteur à l'aide de Kafka

<?php
class KafkaProducer
{
    public static $brokerList = &#39;127.0.0.1:9092&#39;;
    public static function send($message, $topic)
    {
        self::producer($message, $topic);
    }
    public static function producer($message, $topic = &#39;test&#39;)
    {
        $conf = new \RdKafka\Conf();
        $conf->set(&#39;metadata.broker.list&#39;, self::$brokerList);
        $producer = new \RdKafka\Producer($conf);
        $topic = $producer->newTopic($topic);
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));
        $producer->poll(0);
        $result = $producer->flush(10000);
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException(&#39;Was unable to flush, messages might be lost!&#39;);
        }
    }
}
Copier après la connexion

Créer une classe de consommateur

<?php
class KafkaConsumer
{
    public static $brokerList = &#39;127.0.0.1:9092&#39;;
      public static function consumer()
    {
        $conf = new \RdKafka\Conf();
        $conf->set(&#39;group.id&#39;, &#39;test&#39;);
        $rk = new \RdKafka\Consumer($conf);
        $rk->addBrokers("127.0.0.1");
        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set(&#39;auto.commit.interval.ms&#39;, 100);
        $topicConf->set(&#39;offset.store.method&#39;, &#39;broker&#39;);
        $topicConf->set(&#39;auto.offset.reset&#39;, &#39;smallest&#39;);
        $topic = $rk->newTopic(&#39;test&#39;, $topicConf);
        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
        while (true) {
            $message = $topic->consume(0, 120*10000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    var_dump($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed out\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
    }
}
Copier après la connexion

Résumé du problème

1. Aucun runtime Java présent, demande d'installation

Étant donné que kafka nécessite la prise en charge de l'environnement Java, l'environnement Java est installé. Vous pouvez aller sur javase-jdk14-downloads pour choisir votre propre version à télécharger et installer

2. Créez un sujet et il apparaîtra : Facteur de réplication : 1 plus grand que les courtiers disponibles : 0

Cela signifie qu'il y a au moins un courtier, c'est-à-dire qu'il n'y a pas de courtier valide disponible. Vous devez vous assurer que votre kafka a bien été démarré

Tutoriel recommandé : "Tutoriel PHP"

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:learnku.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal