Encapsulation de fonctions communes RabbitMQ (version php)

藏色散人
Libérer: 2023-04-09 21:00:02
avant
3270 Les gens l'ont consulté

Cet article présente l'encapsulation de fonctions communes de Rabbitmq (version php) à tout le monde. Il a une certaine valeur de référence. Les amis qui en ont besoin peuvent s'y référer.

Lorsque RabbitMQ a été largement utilisé dans le projet, voici un résumé simple des fonctions générales de RabbitMQ, et encapsulé dans un package composer, adresse du package composer (https://packagist.org/packages/ maweibinguo /easyrabbitmq), adresse github (https://github.com/maweibinguo/easyrabbitmq), bienvenue sur fork, en raison du niveau limité, il y aura inévitablement des bugs, bienvenue pour donner des avis précieux

[Apprentissage recommandé : Tutoriel vidéo PHP

Introduction au package easy-rabbitmq

Une encapsulation secondaire du package php-amqplib/php-amqplib, fournissant un ensemble de fonctions communes qui peuvent être utilisées avec des solutions de production prêtes à l'emploi
. La liste des fonctions actuellement prises en charge est la suivante :

  • Push des messages vers des commutateurs directement connectés (y compris les messages retardés)
  • Push des messages vers des commutateurs de secteur (y compris les messages retardés)
  • Poussez les messages vers le commutateur de sujet (y compris les messages retardés)
  • Consommation fiable en mode abonnement Une fois que le consommateur ne parvient pas à consommer, il essaiera de continuer à consommer, jusqu'à 5 tentatives.
  • Consommation fiable en mode pull, le consommateur tentera de poursuivre la consommation après échec, jusqu'à 5 tentatives.

S'il y a d'autres scènes, continuez à les ajouter et répétez plus tard ! !

Exigences

  • Les exigences de la version PHP du package d'installation dépendent principalement des exigences du package php-amqplib/php-amqplib lui-même afin de prendre en compte les utilisateurs de. php5.0, nous utilisons La version du package php-amqplib/php-amqplib V2.9.0 a été installée.
    Pour les exigences spécifiques, veuillez vous référer ici (https://packagist.org/packages/php-amqplib/php-amqplib#v2.9.0).
    Cependant, l'auteur recommande d'utiliser php7.0 et supérieur. Ce package de développement est également développé sur la version 7.0 !

Installer

      composer require maweibinguo/easyrabbitmq
Copier après la connexion

Utiliser

Ici, nous recommandons la combinaison script php + superviseur pour assurer la fiabilité du processus de consommation et améliorer la capacité de consommation du travailleur ! Si vous n'avez pas entendu parler de superviseur, vous pouvez cliquer ici (http://www.supervisord.org/introduction.html) pour en savoir plus.

1- Message push

1-. 1. Poussez le message vers le commutateur directement connecté

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToDirect(
                        $msg = time(), //消息体内容
                        $exchange = "easy_direct_exchange", //交换机名称
                        $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToDirect(
                        $msg = time(), //消息体内容
                        $exchange = "easy_direct_exchange", //交换机名称
                        $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致
      );
Copier après la connexion

1-2. Poussez le message vers le commutateur de secteur

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToFanout(
                        $msg = time(), //消息体内容
                        $exchange = "easy_fanout_exchange", //交换机名称
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToFanout(
                        $msg = time(), //消息体内容
                        $exchange = "easy_fanout_exchange" //交换机名称
      );
Copier après la connexion

1-3.Poussez le message vers le commutateur de sujet

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToTopic(
                        $msg = time(), //消息体内容
                        $exchange = "easy_topic_exchange", //交换机名称
                        /**
                         * routingKey 要同consum(get)方法的bindingKey相匹配
                         * bindingKey支持两种特殊的字符"*"、“#”,用作模糊匹配, 其中"*"用于匹配一个单词、“#”用于匹配多个单词(也可以是0个)
                         * 无论是bindingKey还是routingKey, 被"."分隔开的每一段独立的字符串就是一个单词, easy.topic.queue, 包含三个单词easy、topic、queue
                         */
                        $routingKey = "easy.topic.queue",
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToTopic(
                        $msg = time(), //消息体内容
                        $exchange = "easy_topic_exchange", //交换机名称
                        $routingKey = "easy.topic.queue"    
      );
Copier après la connexion
<.>2. Consommer le message

La consommation prend en charge les nouvelles tentatives automatiques, avec un maximum de 5 tentatives après chaque échec de consommation, le message sera remis dans la file d'attente de consommation. Le temps de nouvelle tentative passera progressivement à mesure que le nombre d'échecs augmente. La stratégie de relocalisation prise en charge par ce client est la suivante :

Échec une fois (il sera à nouveau délivré après 1 seconde), échec deux fois (il sera à nouveau délivré après 2 secondes). secondes) livré), a échoué 3 fois (sera à nouveau livré après 4 secondes), a échoué 4 fois (sera à nouveau livré après 8 secondes), a échoué 5 fois (sera à nouveau livré après 16 secondes)

2 -1. Mode abonnement

Consommation fiable en mode abonnement
      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      $instance->consume(
            $queueName = "direct_test_queue",//订阅的队列名称
            $consumerTag = "c1",//消费标记
            $exchange = "easy_direct_exchange",//交换机名称
            $bindingKey = "direct_test_queue",//bindingkey,如果是直链交换机需要同routingKey保持一致
            $callback = function($msg){
                $body = $msg->body;
                file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
                //如果返回结果不绝对等于(===)true,那么将触发消息重试机制
                return false;
            },
            //5次消费消费失败后,失败消息将会投递到的队列名称
            $failedQueue = "easymq@failed"
      );
Copier après la connexion
2-2. Mode pull

Consommation fiable en mode pull
      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      $instance->get(
            $queue = "get_queue",
            $exchange = "easy_fanout_exchange",
            $bindingKey = "",
            $callback = function($msg){
                $body = $msg->body;
                file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
                //如果返回结果不绝对等于(===)true,那么将触发消息重试机制
                return false;
            },
            //5次消费消费失败后,失败消息将会投递到的队列名称
            $failedQueue = 'easymq@failed'
      );
Copier après la connexion
.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:segmentfault.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal