ホームページ > Java > &#&チュートリアル > Java RabbitMQ の高度な機能の例の分析

Java RabbitMQ の高度な機能の例の分析

WBOY
リリース: 2023-04-29 20:25:05
転載
910 人が閲覧しました

    メッセージの信頼性の高い配信

    RabbitMQ をメッセージ送信者として使用する場合、メッセージの損失や配信失敗のシナリオを回避したいと考えています。 RabbitMQ は、メッセージの配信信頼性モードを制御する 2 つの方法を提供します。

    • #confirm 確認モード

    • return return モード

    rabbitmq のメッセージ配信パス全体は次のとおりです:

    Producer—>rabbitmq Broker—>Exchange—>queue—>consumer

    ##プロデューサーから Exchange へのメッセージは、confirmCallback を返します
    • #交換キューからのメッセージの配信に失敗した場合は、returnCallback が返されます。

    • これら 2 つのコールバックを使用して、メッセージ配信の信頼性を制御できます。

    • 確認モード

    メッセージがプロデューサーからエクスチェンジに送信されると、confirmCallback が返されます

    Spring Integration Rabbitmq を例として取り上げ、rabbitmq 構成ファイルを変更し、パブリッシャーを追加します-connectionFactory の属性を確認し、値を true に設定します

    <!--
    * 确认模式:
    * 步骤:
    * 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
    -->
    <!-- 定义rabbitmq connectionFactory -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"
                                   publisher-confirms="true"/>
    ログイン後にコピー
    /*
     * 确认模式:
     * 步骤:
     * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
     */
    @Test
        public void queueTest(){
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                /**
                 *
                 * @param correlationData 相关配置信息
                 * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
                 * @param cause 失败原因
                 */
                    System.out.println("confirm方法被执行了....");
                    if (ack) {
                        //接收成功
                        System.out.println("接收成功消息" + cause);
                    } else {
                        //接收失败
                        System.out.println("接收失败消息" + cause);
                        //做一些处理,让消息再次发送。
                    }
                }
            });
            //路由键与队列同名
            rabbitTemplate.convertAndSend("spring_queue", "message confirm....");
        }
    ログイン後にコピー

    メッセージはキューに正常に送信されるため、返される原因値は空です。例外が発生した場合は、原因値が返されます。異常な理由です

    Java RabbitMQ の高度な機能の例の分析Return Mode

    Exchange–>キューからのメッセージ配信が失敗した場合、returnCallback が返されます

    1. フォールバック モードをオンにします。 Publisher-returns="true"

        <!-- 定义rabbitmq connectionFactory -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"
                                   publisher-returns="true"/>
    ログイン後にコピー

    2. Exchange メッセージ処理失敗のモードを設定します: setMandatory、次に ReturnCallBack

        @Test
        public void queueTest(){
            //1.设置交换机处理失败消息的模式
            rabbitTemplate.setMandatory(true);
            //2.设置ReturnCallBack
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                /**
                 * @param message    消息对象
                 * @param replyCode  错误码
                 * @param replyText  错误信息
                 * @param exchange   交换机
                 * @param routingKey 路由键
                 */
                @Override
                public void returnedMessage(Message message, int replyCode, String
                        replyText, String exchange, String routingKey) {
                    System.out.println("return 执行了....");
                    System.out.println(message);
                    System.out.println(replyCode);
                    System.out.println(replyText);
                    System.out.println(exchange);
                    System.out.println(routingKey);
                    //处理
                }
            });
            //手动添加错误路由模拟错误发生
            rabbitTemplate.convertAndSend("spring_topic_exchange", "return123", "return message...");
        }
    ログイン後にコピー

    を設定します。エラーが発生した場合にのみメッセージが返されるため、手動でエラーを追加し、送信メッセージにルーティング値 return123 を追加しますが、実際にはそのようなルーティングは存在せず、実行して返されるメッセージは次のとおりです。

    Consumer Ack

    Java RabbitMQ の高度な機能の例の分析ack は、Acknowledge、確認を意味します。コンシューマがメッセージを受信した後の確認方法を示します。

    自動確認:acknowledge="none"

    手動確認:acknowledge=手動”
    • #異常状況に応じて確認:acknowledge="auto", (この方法は面倒で学習の必要はありません)
    • その内、自動確認とは、コンシューマがメッセージを受信すると、自動的に受信が確認され、対応するメッセージが RabbitMQ メッセージ キャッシュから削除されることを意味します。しかし、実際の業務処理では、メッセージ受信後の業務処理で例外が発生した場合、メッセージが失われる可能性が高くなります。手動確認メソッドが設定されている場合は、業務処理が成功した後に、channel.basicAck() を呼び出して手動でサインインする必要があります。例外が発生した場合は、channel.basicNack() メソッドを呼び出して、メッセージを自動的に再送信します。
    • Spring の Rabbitmq の統合を例に、rabbitmq 設定ファイルに確認方法を設定します。

      <rabbit:listener-container connection-factory="connectionFactory"
      acknowledge="manual">
      .....
      ログイン後にコピー

      監視クラスのコードは次のとおりです:
    • public class AckListener implements ChannelAwareMessageListener {
          @Override
          public void onMessage(Message message, Channel channel) throws Exception {
              long deliveryTag = message.getMessageProperties().getDeliveryTag();
              try {
                  //1.接收转换消息
                  System.out.println(new String(message.getBody()));
                  //2. 处理业务逻辑
                  System.out.println("处理业务逻辑...");
                  int i = 3/0;//出现错误
                  // 3. 手动签收
                  channel.basicAck(deliveryTag,true);
              } catch (Exception e) {
                  //e.printStackTrace();
                  //4.拒绝签收
                  /*
                   *第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会
                   *重新发送该消息给消费端
                   */
                  channel.basicNack(deliveryTag,true,true);
                  //channel.basicReject(deliveryTag,true);
              }
          }
      }
      ログイン後にコピー
    チャネル例外のため .basicNack() メソッドが呼び出されます。メッセージが自動的に再送信されるため、出力内容は無限ループになります

    #コンシューマ側の電流制限

    Rabbitmq サーバーに数万件の未処理メッセージのバックログがある場合、コンシューマー クライアントを自由に開くと、次のような状況が発生します。大量のメッセージがプッシュされます。瞬時に完了しますが、単一のクライアントが同時にそれほど多くのデータを処理することはできません! データの量が特に大きい場合、同時実行性が非常に大きくなることもあるため、本番エンドのフローを制限することは明らかに非科学的です。また、同時実行性が非常に小さい場合もあり、生産終了を制限することはできませんが、これはユーザーの動作です。 Rabbitmqは、一定数のメッセージ(チャネルにQOS値を設定)があれば、メッセージの非自動確認を前提としたQOS(Quality of Service)機能を提供します。またはコンシューマ)は前に確認されていないため、新しいメッセージを消費しません。 Java RabbitMQ の高度な機能の例の分析

    1. ACK メカニズムが手動確認であることを確認します

    2. リスナー コンテナー構成属性 perfetch = 1 は、手動確認が行われるまで、コンシューマーが毎回消費するために mq からメッセージをプルすることを意味します消費が完了すると、次のメッセージのプルが続行されます。

    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true" acknowledge="manual" prefetch="1">
            <rabbit:listener ref="topicListenerACK" queue-names="spring_topic_queue_well2"/>
    </rabbit:listener-container>
    ログイン後にコピー
    Java RabbitMQ の高度な機能の例の分析プロデューサー、5 つのメッセージを送信します

        @Test
        public void topicTest(){
    /**
     * 参数1:交换机名称
     * 参数2:路由键名
     * 参数3:发送的消息内容
     */
            for (int i=0;i<5;i++){
                rabbitTemplate.convertAndSend("spring_topic_exchange", "xzk.a", "发送到spring_topic_exchange交换机xzk.cn的消息"+i);
            }
        }
    }
    ログイン後にコピー

    プロデューサーは、channel.basicAck(deliveryTag,true) をコメントアウトし、メッセージの受信を確認しません

    public class AckListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                //1.接收转换消息
                System.out.println(new String(message.getBody()));
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
                // 3. 手动签收
                //channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
                //4.拒绝签收
                /*
                 *第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会
                 *重新发送该消息给消费端
                 */
                channel.basicNack(deliveryTag,true,true);
            }
        }
    }
    ログイン後にコピー

    コンシューマーを次で開始します今回は、プロデューサーを再度実行した後、コンシューマーが 5 つのメッセージを送信したことがわかりました。実際には、プロデューサーが受信したメッセージは 1 つだけで、電流制限効果に達しました。 Rabbitmq コンソールを確認したところ、unack メッセージが 1 つあることがわかりました。 4 つの準備完了メッセージがまだ消費者に届いていません。これは、設定した prefetchCount=1 という現在の制限状況と一致しています。

    Java RabbitMQ の高度な機能の例の分析

    把channel.basicAck(deliveryTag,true)的注释取消掉,即可以自动确认收到消息,重新运行消费者,接收到了另外的四条消息

    Java RabbitMQ の高度な機能の例の分析

    Java RabbitMQ の高度な機能の例の分析

    TTL(Time To Live)

    Time To Live,消息过期时间设置

    设置某个队列为过期队列

    设置交换机,队列以及队列过期时间为10000ms

     <!--ttl-->
        <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
            <rabbit:queue-arguments>
                <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
            </rabbit:queue-arguments>
        </rabbit:queue>
        <rabbit:topic-exchange name="test_exchange_ttl">
            <rabbit:bindings>
                <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"/>
            </rabbit:bindings>
        </rabbit:topic-exchange>
    ログイン後にコピー

    生产者发送10条消息

        @Test
        public void testTtl() {
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl...");
            }
    ログイン後にコピー

    Java RabbitMQ の高度な機能の例の分析

    十秒钟后,过期消息消失

    Java RabbitMQ の高度な機能の例の分析

    设置单独某个消息过期

    设置交换机和队列

    <rabbit:queue name="test_queue_ttl" id="test_queue_ttl"/>
    <rabbit:topic-exchange name="test_exchange_ttl">
        <rabbit:bindings>
            <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"/>     
        </rabbit:bindings>
    </rabbit:topic-exchange>
    ログイン後にコピー

    生产者发送特定过期消息,用到了MessagePostProcessor这个api

     @Test
        public void testTtl() {
            MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //1.设置message信息
                    message.getMessageProperties().setExpiration("5000");//消息的过期时间
                    //2.返回该消息
                    return message;
                }
            };
            //消息单独过期
            rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl...",messagePostProcessor);
        }
    ログイン後にコピー

    Java RabbitMQ の高度な機能の例の分析

    5s之后

    Java RabbitMQ の高度な機能の例の分析

    注:

    1.如果同时设置队列过期和消息过期,系统会根据哪个过期的时间短而选用哪儿个。

    2.设置单独消息过期时,如果该消息不为第一个接受的消息,则不过期。

    死信队列

    死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Deadmessage后,可以被重新发送到另一个交换机,这个交换机就是DLX。

    Java RabbitMQ の高度な機能の例の分析

    消息成为死信的三种情况:

    • 队列消息长度到达限制;

    • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

    • 原队列存在消息过期设置,消息到达超时时间未被消费;

    队列绑定死信交换机:

    给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

    Java RabbitMQ の高度な機能の例の分析

    实现

    1.声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)

    <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
        <!--正常队列绑定死信交换机-->
        <rabbit:queue-arguments>
            <!--x-dead-letter-exchange:死信交换机名称-->
            <entry key="x-dead-letter-exchange" value="exchange_dlx" />
            <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
            <entry key="x-dead-letter-routing-key" value="dlx.hehe" />
            <!--4.1 设置队列的过期时间 ttl-->
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
            <!--4.2 设置队列的长度限制 max-length -->
            <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx">
            </rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    ログイン後にコピー

    2.声明死信队列(queue_dlx)和死信交换机(exchange_dlx)

    <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    ログイン後にコピー

    3.生产端测试

    /**
    * 发送测试死信消息:
    * 1. 过期时间
    * 2. 长度限制
    * 3. 消息拒收
    */
    @Test
    public void testDlx(){
        //1. 测试过期时间,死信消息
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
        //2. 测试长度限制后,消息死信
        /* for (int i = 0; i < 20; i++) {
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
        }*/
        //3. 测试消息拒收
        //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
    }
    ログイン後にコピー

    4.消费端监听

    public class DlxListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                //1.接收转换消息
                System.out.println(new String(message.getBody()));
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
                int i = 3/0;//出现错误
                //3. 手动签收
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
                System.out.println("出现异常,拒绝接受");
                //4.拒绝签收,不重回队列 requeue=false
                channel.basicNack(deliveryTag,true,false);
            }
        }
    }
    ログイン後にコピー
    <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx">
    </rabbit:listener>
    ログイン後にコピー

    延迟队列

    延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。c

    需求:

    1.下单后,30分钟未支付,取消订单,回滚库存。

    2.新用户注册成功7天后,发送短信问候。

    实现方式:

    • 定时器

    • 延迟队列

    定时器的实现方式不够优雅,我们采取延迟队列的方式

    Java RabbitMQ の高度な機能の例の分析

    不过很可惜,在RabbitMQ中并未提供延迟队列功能。

    但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

    Java RabbitMQ の高度な機能の例の分析

    配置

    <!--
    延迟队列:
            1. 定义正常交换机(order_exchange)和队列(order_queue)
            2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
            3. 绑定,设置正常队列过期时间为30分钟
    -->
    <!-- 定义正常交换机(order_exchange)和队列(order_queue)-->
    <rabbit:queue id="order_queue" name="order_queue">
    <!-- 绑定,设置正常队列过期时间为30分钟-->
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
            <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="order_exchange">
        <rabbit:bindings>
            <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!-- 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
    <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="order_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    ログイン後にコピー

    生产端测试

    @Test
    public void testDelay() throws InterruptedException {
        //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
        rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2019年8月17日16:41:47");
        /*//2.打印倒计时10秒
        for (int i = 10; i > 0 ; i--) {
            System.out.println(i+"...");
            Thread.sleep(1000);
        }*/
    }
    ログイン後にコピー

    消费端监听

    public class OrderListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
    		long deliveryTag = message.getMessageProperties().getDeliveryTag();
    		try {
    			//1.接收转换消息
    			System.out.println(new String(message.getBody()));
    			//2. 处理业务逻辑
    			System.out.println("处理业务逻辑...");
    			System.out.println("根据订单id查询其状态...");
    			System.out.println("判断状态是否为支付成功");
    			System.out.println("取消订单,回滚库存....");
    			//3. 手动签收
    			channel.basicAck(deliveryTag,true);
    		} catch (Exception e) {
    			//e.printStackTrace();
    			System.out.println("出现异常,拒绝接受");
    			//4.拒绝签收,不重回队列 requeue=false
    			channel.basicNack(deliveryTag,true,false);
    		}
    	}
    }
    ログイン後にコピー
    <rabbit:listener ref="orderListener" queue-names="order_queue_dlx">
    </rabbit:listener>
    ログイン後にコピー

    以上がJava RabbitMQ の高度な機能の例の分析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    関連ラベル:
    ソース:yisu.com
    このウェブサイトの声明
    この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
    人気のチュートリアル
    詳細>
    最新のダウンロード
    詳細>
    ウェブエフェクト
    公式サイト
    サイト素材
    フロントエンドテンプレート