目次
メッセージが失われないようにする方法
メッセージ配信の信頼性とは
##デッドレターキューとは
##何メッセージがデッドレターになる状況
消費者がメッセージを拒否
ホームページ Java &#&チュートリアル SpringBoot が RabbitMQ を統合して遅延キューを実装する方法

SpringBoot が RabbitMQ を統合して遅延キューを実装する方法

May 16, 2023 pm 08:31 PM
springboot rabbitmq

    メッセージが失われないようにする方法

    rabbitmq メッセージ配信パス

    Producer->Switch-> Queue->Consumer

    一般に、これは 3 つの段階に分かれています。

    • 1. プロデューサは、メッセージ配信の信頼性を保証します。

    • 2.mq 内部メッセージは失われません。

    • 3. 消費者消費は成功しています。

    メッセージ配信の信頼性とは

    簡単に言うと、メッセージは 100% メッセージ キューに送信されます。

    confirmCallback をオンにすることができます

    プロデューサーがメッセージを配信した後、mq はプロデューサーに確認応答を返します。確認応答に基づいて、プロデューサーはメッセージが mq に送信されたかどうかを確認できます。

    ##confirmCallback を開く

    構成ファイルを変更します

    #NONE:禁用发布确认模式,是默认值,CORRELATED:发布消息成功到交换器后会触发回调方法
    spring:
      rabbitmq:
        publisher-confirm-type: correlated
    ログイン後にコピー

    テスト コード

    @Test  
    public void testConfirmCallback() throws InterruptedException {  
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {  
        /**  
        *  
        * @param correlationData 配置  
        * @param ack 交换机是否收到消息,true是成功,false是失败  
        * @param cause 失败的原因  
        */  
        @Override  
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {  
            System.out.println("confirm=====>");  
            System.out.println("confirm==== ack="+ack);  
            System.out.println("confirm==== cause="+cause);  
            //根据ACK状态做对应的消息更新操作 TODO  
        }  
        });  
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "鸡你太美");  
        Thread.sleep(10000);  
    }
    ログイン後にコピー

    returnCallback を使用して、メッセージがエクスチェンジャからキューに正常に送信されたことを確認します。構成ファイルを変更します

    spring:
      rabbitmq:
        #开启returnCallback
        publisher-returns: true
        #交换机处理消息到路由失败,则会返回给生产者
        template:
          mandatory: true
    ログイン後にコピー

    テスト コード

    @Test  
    void testReturnCallback() {  
        //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定  
        rabbitTemplate.setMandatory(true);  
        //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息  
        rabbitTemplate.setReturnsCallback(returned -> {  
            int code = returned.getReplyCode();  
            System.out.println("code="+code);  
            System.out.println("returned="+ returned);  
        });  
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","测试returnCallback");  
    }
    ログイン後にコピー

    コンシューマがメッセージを消費するとき、ack を通じてメッセージが消費されたことを手動で確認する必要があります。

    構成ファイルを変更する

    spring:
      rabbitmq:
        listener:  
          simple:  
            acknowledge-mode: manual
    ログイン後にコピー

    テスト コードを作成する

    @RabbitHandler  
    public void consumer(String body, Message message, Channel channel) throws IOException {  
        long msgTag = message.getMessageProperties().getDeliveryTag();  
        System.out.println("msgTag="+msgTag);  
        System.out.println("message="+ message);  
        System.out.println("body="+body);  
    
        //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除  
        channel.basicAck(msgTag,false);  
        // channel.basicNack(msgTag,false,true);  
      
    }
    ログイン後にコピー

    deliveryTags はメッセージ配信シーケンス番号です。メッセージが消費されるか、メッセージが再配信されるたびに、 deliveryTag が増加します

    ttlデッドレターキュー

    ##デッドレターキューとは

    ##期限内に消費されないメッセージが保存されるキュー

    ##何メッセージがデッドレターになる状況

    消費者がメッセージを拒否

    (basic.reject/basic.nack)
      し、再度キューに入れない
    • requeue= false

      #メッセージはキュー内で消費されておらず、キューまたはメッセージ自体の有効期限を超えていますTTL (存続時間)

    • キューのメッセージの長さが制限に達しました

    • 結果: メッセージがデッドレターになった後、キューがバインドされている場合デッド レター スイッチに送信すると、メッセージはデッド レター スイッチによってデッド レター キューに再ルーティングされます。

    • デッド レター キューは、遅延キューの消費によく使用されます。

      遅延キュー
    プロデューサーは、このメッセージが mq に配信されたときにすぐに消費されることを期待しませんが、消費する前に一定期間待機します。

    springboot は、rabbitmq を統合して、タイムアウト時の注文の自動シャットダウンを実現します

    package com.fandf.test.rabbit;  
      
    import org.springframework.amqp.core.*;  
    import org.springframework.beans.factory.annotation.Qualifier;  
    import org.springframework.context.annotation.Bean;  
    import org.springframework.context.annotation.Configuration;  
      
    import java.util.HashMap;  
    import java.util.Map;  
      
    /**  
    * @author fandongfeng  
    * @date 2023/4/15 15:38  
    */  
    @Configuration  
    public class RabbitMQConfig {  
      
        /**  
        * 订单交换机  
        */  
        public static final String ORDER_EXCHANGE = "order_exchange";  
        /**  
        * 订单队列  
        */  
        public static final String ORDER_QUEUE = "order_queue";  
        /**  
        * 订单路由key  
        */  
        public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";  
    
        /**  
        * 死信交换机  
        */  
        public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";  
        /**  
        * 死信队列 routingKey  
        */  
        public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";  
    
        /**  
        * 死信队列  
        */  
        public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";  
    
    
        /**  
        * 创建死信交换机  
        */  
        @Bean("orderDeadLetterExchange")  
        public Exchange orderDeadLetterExchange() {  
            return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);  
        }  
    
        /**  
        * 创建死信队列  
        */  
        @Bean("orderDeadLetterQueue")  
        public Queue orderDeadLetterQueue() {  
            return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();  
        }  
    
        /**  
        * 绑定死信交换机和死信队列  
        */  
        @Bean("orderDeadLetterBinding")  
        public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {  
            return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();  
        }  
    
    
        /**  
        * 创建订单交换机  
        */  
        @Bean("orderExchange")  
        public Exchange orderExchange() {  
            return new TopicExchange(ORDER_EXCHANGE, true, false);  
        }  
    
        /**  
        * 创建订单队列  
        */  
        @Bean("orderQueue")  
        public Queue orderQueue() {  
            Map<String, Object> args = new HashMap<>(3);  
            //消息过期后,进入到死信交换机  
            args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);  
    
            //消息过期后,进入到死信交换机的路由key  
            args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);  
    
            //过期时间,单位毫秒  
            args.put("x-message-ttl", 10000);  
    
            return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();  
        }  
    
        /**  
        * 绑定订单交换机和队列  
        */  
        @Bean("orderBinding")  
        public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {  
            return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();  
        }  
      
      
    }
    ログイン後にコピー

    Consumer

    package com.fandf.test.rabbit;  
      
    import cn.hutool.core.date.DateUtil;  
    import com.rabbitmq.client.Channel;  
    import org.springframework.amqp.core.Message;  
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
    import org.springframework.amqp.rabbit.annotation.RabbitListener;  
    import org.springframework.stereotype.Component;  
      
    import java.io.IOException;  
      
    /**  
    * @author fandongfeng  
    * @date 2023/4/15 15:42  
    */  
    @Component  
    @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)  
    public class OrderMQListener {  
      
      
      
        @RabbitHandler  
        public void consumer(String body, Message message, Channel channel) throws IOException {  
            System.out.println("收到消息:" + DateUtil.now());  
            long msgTag = message.getMessageProperties().getDeliveryTag();  
            System.out.println("msgTag=" + msgTag);  
            System.out.println("message=" + message);  
            System.out.println("body=" + body);  
            channel.basicAck(msgTag, false);  
        }  
      
    }
    ログイン後にコピー

    テストクラス

    @Test  
    void testOrder() throws InterruptedException {  
    //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定  
        rabbitTemplate.setMandatory(true);  
        //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息  
        rabbitTemplate.setReturnsCallback(returned -> {  
        int code = returned.getReplyCode();  
        System.out.println("code=" + code);  
        System.out.println("returned=" + returned);  
        });  
        rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "测试订单延迟");  
        System.out.println("发送消息:" + DateUtil.now());  
        Thread.sleep(20000);  
    }
    ログイン後にコピー
    プログラム出力

    メッセージ送信: 2023-04-16 15:14:34

    メッセージ受信: 2023-04-16 15:14:44

    msgTag=1

    message=(本文: 'テスト注文の遅延' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x-death=[{reason=expired, count=1,exchange=order_exchange, time=Mon Apr 16 15 :14:44 CST 2023、routing-keys=[order]、queue=order_queue}]、x-first-death-reason=expired、x-first-death-queue=order_queue}、contentType=text/plain、contentEncoding = UTF-8、contentLength=0、receiveddeliveryMode=PERSISTENT、priority=0、redelivered=false、receivedExchange=order_dead_letter_exchange、receivedRoutingKey=order_dead_letter_queue_routing_key、deliveryTag=1、consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ、consumerQueue=order_dead_letter_queue] )
    体= テスト注文の遅延


    以上がSpringBoot が RabbitMQ を統合して遅延キューを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    このウェブサイトの声明
    この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

    ホットAIツール

    Undresser.AI Undress

    Undresser.AI Undress

    リアルなヌード写真を作成する AI 搭載アプリ

    AI Clothes Remover

    AI Clothes Remover

    写真から衣服を削除するオンライン AI ツール。

    Undress AI Tool

    Undress AI Tool

    脱衣画像を無料で

    Clothoff.io

    Clothoff.io

    AI衣類リムーバー

    AI Hentai Generator

    AI Hentai Generator

    AIヘンタイを無料で生成します。

    ホットツール

    メモ帳++7.3.1

    メモ帳++7.3.1

    使いやすく無料のコードエディター

    SublimeText3 中国語版

    SublimeText3 中国語版

    中国語版、とても使いやすい

    ゼンドスタジオ 13.0.1

    ゼンドスタジオ 13.0.1

    強力な PHP 統合開発環境

    ドリームウィーバー CS6

    ドリームウィーバー CS6

    ビジュアル Web 開発ツール

    SublimeText3 Mac版

    SublimeText3 Mac版

    神レベルのコード編集ソフト(SublimeText3)

    React と RabbitMQ を使用して信頼性の高いメッセージング アプリを構築する方法 React と RabbitMQ を使用して信頼性の高いメッセージング アプリを構築する方法 Sep 28, 2023 pm 08:24 PM

    React と RabbitMQ を使用して信頼性の高いメッセージング アプリケーションを構築する方法 はじめに: 最新のアプリケーションは、リアルタイム更新やデータ同期などの機能を実現するために、信頼性の高いメッセージングをサポートする必要があります。 React はユーザー インターフェイスを構築するための人気のある JavaScript ライブラリであり、RabbitMQ は信頼性の高いメッセージング ミドルウェアです。この記事では、React と RabbitMQ を組み合わせて信頼性の高いメッセージング アプリケーションを構築する方法を紹介し、具体的なコード例を示します。 RabbitMQ の概要:

    RabbitMQ を使用して PHP で分散メッセージ処理を実装する方法 RabbitMQ を使用して PHP で分散メッセージ処理を実装する方法 Jul 18, 2023 am 11:00 AM

    RabbitMQ を使用して PHP で分散メッセージ処理を実装する方法 はじめに: 大規模なアプリケーション開発では、分散システムが一般的な要件になっています。分散メッセージ処理は、タスクを複数の処理ノードに分散することでシステムの効率と信頼性を向上させるパターンです。 RabbitMQ は、AMQP プロトコルを使用してメッセージの配信と処理を実装する、オープンソースの信頼性の高いメッセージ キュー システムです。この記事では、配布のために PHP で RabbitMQ を使用する方法について説明します。

    SpringBootとSpringMVCの比較と差異分析 SpringBootとSpringMVCの比較と差異分析 Dec 29, 2023 am 11:02 AM

    SpringBoot と SpringMVC はどちらも Java 開発で一般的に使用されるフレームワークですが、それらの間には明らかな違いがいくつかあります。この記事では、これら 2 つのフレームワークの機能と使用法を調べ、その違いを比較します。まず、SpringBoot について学びましょう。 SpringBoot は、Spring フレームワークに基づいたアプリケーションの作成と展開を簡素化するために、Pivo​​tal チームによって開発されました。スタンドアロンの実行可能ファイルを構築するための高速かつ軽量な方法を提供します。

    Go での RabbitMQ の使用: 完全ガイド Go での RabbitMQ の使用: 完全ガイド Jun 19, 2023 am 08:10 AM

    最新のアプリケーションが複雑になるにつれて、メッセージングは​​強力なツールになっています。この分野では、RabbitMQ は、異なるアプリケーション間でメッセージを配信するために使用できるメッセージ ブローカーとして非常に人気があります。この記事では、Go 言語で RabbitMQ を使用する方法を説明します。このガイドでは以下の内容について説明します: RabbitMQ の概要 RabbitMQ のインストール RabbitMQ の基本概念 Go で RabbitMQ を使用する入門 RabbitMQ と Go

    SpringBoot+Dubbo+Nacos開発実践チュートリアル SpringBoot+Dubbo+Nacos開発実践チュートリアル Aug 15, 2023 pm 04:49 PM

    この記事では、dubbo+nacos+Spring Boot の実際の開発について詳しく説明する例を書きます。この記事では理論的な知識はあまり取り上げませんが、dubbo を nacos と統合して開発環境を迅速に構築する方法を説明する最も簡単な例を書きます。

    Golang と RabbitMQ 間のリアルタイム データ同期のためのソリューション Golang と RabbitMQ 間のリアルタイム データ同期のためのソリューション Sep 27, 2023 pm 10:41 PM

    Golang と RabbitMQ 間のリアルタイム データ同期ソリューションの紹介: 今日の時代では、インターネットの普及とデータ量の爆発的な増加に伴い、リアルタイム データ同期の重要性がますます高まっています。非同期データ送信とデータ同期の問題を解決するために、多くの企業はメッセージ キューを使用してデータのリアルタイム同期を実現し始めています。この記事では、Golang と RabbitMQ に基づくリアルタイム データ同期ソリューションを紹介し、具体的なコード例を示します。 1. RabbitMQ とは何ですか?ラビ

    go-zeroとRabbitMQの応用実践 go-zeroとRabbitMQの応用実践 Jun 23, 2023 pm 12:54 PM

    現在、マイクロサービス アーキテクチャ モデルを採用する企業が増えており、このアーキテクチャではメッセージ キューが重要な通信手段となっており、その中でも RabbitMQ が広く使用されています。 Go 言語では、go-zero は近年登場したフレームワークであり、開発者がメッセージ キューをより簡単に使用できるようにするための実用的なツールやメソッドが数多く提供されています。以下では、実際のアプリケーションに基づいて go-zero を紹介します。とRabbitMQの応用実践。 1.RabbitMQ の概要Rabbit

    Golang RabbitMQ: 高可用性メッセージ キュー システムのアーキテクチャ設計と実装 Golang RabbitMQ: 高可用性メッセージ キュー システムのアーキテクチャ設計と実装 Sep 28, 2023 am 08:18 AM

    GolangRabbitMQ: 高可用性メッセージ キュー システムのアーキテクチャ設計と実装には、特定のコード サンプルが必要です はじめに: インターネット技術の継続的な開発とその広範な応用により、メッセージ キューは現代のソフトウェア システムに不可欠な部分になりました。メッセージ キューは、デカップリング、非同期通信、フォールト トレラント処理、その他の機能を実装するツールとして、分散システムに高可用性とスケーラビリティのサポートを提供します。 Golang は効率的で簡潔なプログラミング言語として、同時実行性とパフォーマンスの高いシステムを構築するために広く使用されています。

    See all articles