ホームページ > バックエンド開発 > PHPチュートリアル > Rabbitmq共通関数のカプセル化(php版)

Rabbitmq共通関数のカプセル化(php版)

藏色散人
リリース: 2023-04-09 21:00:02
転載
3336 人が閲覧しました

この記事はrabbitmqの共通関数のカプセル化(php版)を皆さんに紹介するもので、一定の参考価値がありますので、必要な方は参考にしていただければ幸いです。

rabbitmq がプロジェクトで広く使用されている場合、ここでは、rabbitmq の通常の機能の簡単な概要を示し、コンポーザー パッケージにカプセル化されています。コンポーザー パッケージのアドレス (https://packagist.org/packages/maweibinguo/easyrabbitmq) )、github アドレス (https://github.com/maweibinguo/easyrabbitmq)、フォークすることは大歓迎です。レベルが限られているため、バグは避けられず、貴重な意見を歓迎します。

[推奨学習: PHP ビデオ チュートリアル]

easy-rabbitmq パッケージの紹介

php-amqplib/php-amqplib パッケージの二次カプセル化。共通関数のセットを提供します。すぐに使用できる実稼働ソリューション
。現在サポートされている機能リストは次のとおりです。

  • 直接接続されたスイッチへのメッセージのプッシュ (遅延メッセージを含む)
  • セクター スイッチへのメッセージのプッシュ (遅延メッセージを含む)
  • トピック スイッチへのメッセージのプッシュ (遅延メッセージを含む)
  • サブスクリプション モードでの信頼性の高い消費。コンシューマーは消費に失敗した後、最大 5 回まで消費を継続しようとします。
  • プル モードでの信頼性の高い消費。消費者は失敗後も最大 5 回まで消費を継続しようとします。

他のシナリオがある場合は、引き続き追加して後で繰り返してください。 !

要件

  • インストール パッケージの PHP バージョン要件は、主に php-amqplib/php-amqplib パッケージ自体の要件に依存します。 php5.0 を使用します php-amqplib/php-amqplib パッケージのバージョン V2.9.0 がインストールされています。
    具体的な要件については、こちら (https://packagist.org/packages/php-amqplib/php-amqplib#v2.9.0) を参照してください。
    ただし、作者は php7.0 以降の使用を推奨しており、この開発パッケージもバージョン 7.0 で開発されています。

インストール

      composer require maweibinguo/easyrabbitmq
ログイン後にコピー

使用

ここでは、消費プロセスの信頼性を確保し、ワーカーの消費能力を高めるために、php スクリプト スーパーバイザを組み合わせて使用​​することをお勧めします。 !スーパーバイザーについて聞いたことがない場合は、ここ (http://www.supervisord.org/introduction.html) をクリックして詳細をご覧ください。

1. プッシュ メッセージ

1- 1. 直接接続されたスイッチ

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToDirect(
                        $msg = time(), //消息体内容
                        $exchange = "easy_direct_exchange", //交换机名称
                        $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToDirect(
                        $msg = time(), //消息体内容
                        $exchange = "easy_direct_exchange", //交换机名称
                        $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致
      );
ログイン後にコピー

1-2にメッセージをプッシュし、セクタースイッチ

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToFanout(
                        $msg = time(), //消息体内容
                        $exchange = "easy_fanout_exchange", //交换机名称
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToFanout(
                        $msg = time(), //消息体内容
                        $exchange = "easy_fanout_exchange" //交换机名称
      );
ログイン後にコピー

1-3にメッセージをプッシュし、トピックスイッチ

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToTopic(
                        $msg = time(), //消息体内容
                        $exchange = "easy_topic_exchange", //交换机名称
                        /**
                         * routingKey 要同consum(get)方法的bindingKey相匹配
                         * bindingKey支持两种特殊的字符"*"、“#”,用作模糊匹配, 其中"*"用于匹配一个单词、“#”用于匹配多个单词(也可以是0个)
                         * 无论是bindingKey还是routingKey, 被"."分隔开的每一段独立的字符串就是一个单词, easy.topic.queue, 包含三个单词easy、topic、queue
                         */
                        $routingKey = "easy.topic.queue",
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToTopic(
                        $msg = time(), //消息体内容
                        $exchange = "easy_topic_exchange", //交换机名称
                        $routingKey = "easy.topic.queue"    
      );
ログイン後にコピー

2にメッセージをプッシュし、消費しますmessage

Consumption は自動再試行をサポートしており、最大 5 回の再試行が可能です。各消費が失敗すると、メッセージは消費キューに戻されます。失敗回数が増えると再試行時間は徐々に経過します。このクライアントがサポートする遅延戦略は次のとおりです:
失敗 1 回 (1 秒後に再度配信されます)、失敗 2 回 (再度配信されます) 2 秒後) 配信)、3 回失敗 (4 秒後に再度配信)、4 回失敗 (8 秒後に再度配信)、5 回失敗 (16 秒後に再度配信)

2-1. サブスクリプション モード

サブスクリプション モードで確実に消費する
      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      $instance->consume(
            $queueName = "direct_test_queue",//订阅的队列名称
            $consumerTag = "c1",//消费标记
            $exchange = "easy_direct_exchange",//交换机名称
            $bindingKey = "direct_test_queue",//bindingkey,如果是直链交换机需要同routingKey保持一致
            $callback = function($msg){
                $body = $msg->body;
                file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
                //如果返回结果不绝对等于(===)true,那么将触发消息重试机制
                return false;
            },
            //5次消费消费失败后,失败消息将会投递到的队列名称
            $failedQueue = "easymq@failed"
      );
ログイン後にコピー

2-2. プル モード

プル モードで確実に消費する
      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      $instance->get(
            $queue = "get_queue",
            $exchange = "easy_fanout_exchange",
            $bindingKey = "",
            $callback = function($msg){
                $body = $msg->body;
                file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
                //如果返回结果不绝对等于(===)true,那么将触发消息重试机制
                return false;
            },
            //5次消费消费失败后,失败消息将会投递到的队列名称
            $failedQueue = 'easymq@failed'
      );
ログイン後にコピー

以上がRabbitmq共通関数のカプセル化(php版)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:segmentfault.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート