SpringBoot が RabbitMQ を統合して遅延キューを実装する方法
メッセージが失われないようにする方法
rabbitmq メッセージ配信パス
Producer->Switch-> Queue->Consumer
一般に、これは 3 つの段階に分かれています。
1. プロデューサは、メッセージ配信の信頼性を保証します。
2.mq 内部メッセージは失われません。
3. 消費者消費は成功しています。
メッセージ配信の信頼性とは
簡単に言うと、メッセージは 100% メッセージ キューに送信されます。
confirmCallback をオンにすることができます
プロデューサーがメッセージを配信した後、mq はプロデューサーに確認応答を返します。確認応答に基づいて、プロデューサーはメッセージが mq に送信されたかどうかを確認できます。
##confirmCallback を開く構成ファイルを変更します#NONE:禁用发布确认模式,是默认值,CORRELATED:发布消息成功到交换器后会触发回调方法 spring: rabbitmq: publisher-confirm-type: correlated
@Test public void testConfirmCallback() throws InterruptedException { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 配置 * @param ack 交换机是否收到消息,true是成功,false是失败 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm=====>"); System.out.println("confirm==== ack="+ack); System.out.println("confirm==== cause="+cause); //根据ACK状态做对应的消息更新操作 TODO } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "鸡你太美"); Thread.sleep(10000); }
spring: rabbitmq: #开启returnCallback publisher-returns: true #交换机处理消息到路由失败,则会返回给生产者 template: mandatory: true
@Test void testReturnCallback() { //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定 rabbitTemplate.setMandatory(true); //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息 rabbitTemplate.setReturnsCallback(returned -> { int code = returned.getReplyCode(); System.out.println("code="+code); System.out.println("returned="+ returned); }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","测试returnCallback"); }
spring: rabbitmq: listener: simple: acknowledge-mode: manual
@RabbitHandler public void consumer(String body, Message message, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("message="+ message); System.out.println("body="+body); //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除 channel.basicAck(msgTag,false); // channel.basicNack(msgTag,false,true); }
##デッドレターキューとは
##期限内に消費されないメッセージが保存されるキュー##何メッセージがデッドレターになる状況
消費者がメッセージを拒否
(basic.reject/basic.nack)- し、再度キューに入れない
- requeue= false
#メッセージはキュー内で消費されておらず、キューまたはメッセージ自体の有効期限を超えていますTTL (存続時間)
キューのメッセージの長さが制限に達しました
- 結果: メッセージがデッドレターになった後、キューがバインドされている場合デッド レター スイッチに送信すると、メッセージはデッド レター スイッチによってデッド レター キューに再ルーティングされます。
- デッド レター キューは、遅延キューの消費によく使用されます。 遅延キュー
package com.fandf.test.rabbit;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author fandongfeng
* @date 2023/4/15 15:38
*/
@Configuration
public class RabbitMQConfig {
/**
* 订单交换机
*/
public static final String ORDER_EXCHANGE = "order_exchange";
/**
* 订单队列
*/
public static final String ORDER_QUEUE = "order_queue";
/**
* 订单路由key
*/
public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";
/**
* 死信交换机
*/
public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";
/**
* 死信队列 routingKey
*/
public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";
/**
* 死信队列
*/
public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";
/**
* 创建死信交换机
*/
@Bean("orderDeadLetterExchange")
public Exchange orderDeadLetterExchange() {
return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);
}
/**
* 创建死信队列
*/
@Bean("orderDeadLetterQueue")
public Queue orderDeadLetterQueue() {
return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();
}
/**
* 绑定死信交换机和死信队列
*/
@Bean("orderDeadLetterBinding")
public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();
}
/**
* 创建订单交换机
*/
@Bean("orderExchange")
public Exchange orderExchange() {
return new TopicExchange(ORDER_EXCHANGE, true, false);
}
/**
* 创建订单队列
*/
@Bean("orderQueue")
public Queue orderQueue() {
Map<String, Object> args = new HashMap<>(3);
//消息过期后,进入到死信交换机
args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);
//消息过期后,进入到死信交换机的路由key
args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);
//过期时间,单位毫秒
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();
}
/**
* 绑定订单交换机和队列
*/
@Bean("orderBinding")
public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();
}
}
ログイン後にコピー
Consumerpackage com.fandf.test.rabbit; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author fandongfeng * @date 2023/4/15 15:38 */ @Configuration public class RabbitMQConfig { /** * 订单交换机 */ public static final String ORDER_EXCHANGE = "order_exchange"; /** * 订单队列 */ public static final String ORDER_QUEUE = "order_queue"; /** * 订单路由key */ public static final String ORDER_QUEUE_ROUTING_KEY = "order.#"; /** * 死信交换机 */ public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange"; /** * 死信队列 routingKey */ public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key"; /** * 死信队列 */ public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue"; /** * 创建死信交换机 */ @Bean("orderDeadLetterExchange") public Exchange orderDeadLetterExchange() { return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false); } /** * 创建死信队列 */ @Bean("orderDeadLetterQueue") public Queue orderDeadLetterQueue() { return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build(); } /** * 绑定死信交换机和死信队列 */ @Bean("orderDeadLetterBinding") public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs(); } /** * 创建订单交换机 */ @Bean("orderExchange") public Exchange orderExchange() { return new TopicExchange(ORDER_EXCHANGE, true, false); } /** * 创建订单队列 */ @Bean("orderQueue") public Queue orderQueue() { Map<String, Object> args = new HashMap<>(3); //消息过期后,进入到死信交换机 args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE); //消息过期后,进入到死信交换机的路由key args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY); //过期时间,单位毫秒 args.put("x-message-ttl", 10000); return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build(); } /** * 绑定订单交换机和队列 */ @Bean("orderBinding") public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs(); } }
package com.fandf.test.rabbit; import cn.hutool.core.date.DateUtil; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author fandongfeng * @date 2023/4/15 15:42 */ @Component @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE) public class OrderMQListener { @RabbitHandler public void consumer(String body, Message message, Channel channel) throws IOException { System.out.println("收到消息:" + DateUtil.now()); long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag=" + msgTag); System.out.println("message=" + message); System.out.println("body=" + body); channel.basicAck(msgTag, false); } }
@Test
void testOrder() throws InterruptedException {
//为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定
rabbitTemplate.setMandatory(true);
//开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
rabbitTemplate.setReturnsCallback(returned -> {
int code = returned.getReplyCode();
System.out.println("code=" + code);
System.out.println("returned=" + returned);
});
rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "测试订单延迟");
System.out.println("发送消息:" + DateUtil.now());
Thread.sleep(20000);
}
ログイン後にコピープログラム出力
@Test void testOrder() throws InterruptedException { //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定 rabbitTemplate.setMandatory(true); //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息 rabbitTemplate.setReturnsCallback(returned -> { int code = returned.getReplyCode(); System.out.println("code=" + code); System.out.println("returned=" + returned); }); rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "测试订单延迟"); System.out.println("发送消息:" + DateUtil.now()); Thread.sleep(20000); }
メッセージ送信: 2023-04-16 15:14:34
メッセージ受信: 2023-04-16 15:14:44msgTag=1
message=(本文: 'テスト注文の遅延' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x-death=[{reason=expired, count=1,exchange=order_exchange, time=Mon Apr 16 15 :14:44 CST 2023、routing-keys=[order]、queue=order_queue}]、x-first-death-reason=expired、x-first-death-queue=order_queue}、contentType=text/plain、contentEncoding = UTF-8、contentLength=0、receiveddeliveryMode=PERSISTENT、priority=0、redelivered=false、receivedExchange=order_dead_letter_exchange、receivedRoutingKey=order_dead_letter_queue_routing_key、deliveryTag=1、consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ、consumerQueue=order_dead_letter_queue] )体= テスト注文の遅延
以上がSpringBoot が RabbitMQ を統合して遅延キューを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック









React と RabbitMQ を使用して信頼性の高いメッセージング アプリケーションを構築する方法 はじめに: 最新のアプリケーションは、リアルタイム更新やデータ同期などの機能を実現するために、信頼性の高いメッセージングをサポートする必要があります。 React はユーザー インターフェイスを構築するための人気のある JavaScript ライブラリであり、RabbitMQ は信頼性の高いメッセージング ミドルウェアです。この記事では、React と RabbitMQ を組み合わせて信頼性の高いメッセージング アプリケーションを構築する方法を紹介し、具体的なコード例を示します。 RabbitMQ の概要:

RabbitMQ を使用して PHP で分散メッセージ処理を実装する方法 はじめに: 大規模なアプリケーション開発では、分散システムが一般的な要件になっています。分散メッセージ処理は、タスクを複数の処理ノードに分散することでシステムの効率と信頼性を向上させるパターンです。 RabbitMQ は、AMQP プロトコルを使用してメッセージの配信と処理を実装する、オープンソースの信頼性の高いメッセージ キュー システムです。この記事では、配布のために PHP で RabbitMQ を使用する方法について説明します。

SpringBoot と SpringMVC はどちらも Java 開発で一般的に使用されるフレームワークですが、それらの間には明らかな違いがいくつかあります。この記事では、これら 2 つのフレームワークの機能と使用法を調べ、その違いを比較します。まず、SpringBoot について学びましょう。 SpringBoot は、Spring フレームワークに基づいたアプリケーションの作成と展開を簡素化するために、Pivotal チームによって開発されました。スタンドアロンの実行可能ファイルを構築するための高速かつ軽量な方法を提供します。

最新のアプリケーションが複雑になるにつれて、メッセージングは強力なツールになっています。この分野では、RabbitMQ は、異なるアプリケーション間でメッセージを配信するために使用できるメッセージ ブローカーとして非常に人気があります。この記事では、Go 言語で RabbitMQ を使用する方法を説明します。このガイドでは以下の内容について説明します: RabbitMQ の概要 RabbitMQ のインストール RabbitMQ の基本概念 Go で RabbitMQ を使用する入門 RabbitMQ と Go

この記事では、dubbo+nacos+Spring Boot の実際の開発について詳しく説明する例を書きます。この記事では理論的な知識はあまり取り上げませんが、dubbo を nacos と統合して開発環境を迅速に構築する方法を説明する最も簡単な例を書きます。

Golang と RabbitMQ 間のリアルタイム データ同期ソリューションの紹介: 今日の時代では、インターネットの普及とデータ量の爆発的な増加に伴い、リアルタイム データ同期の重要性がますます高まっています。非同期データ送信とデータ同期の問題を解決するために、多くの企業はメッセージ キューを使用してデータのリアルタイム同期を実現し始めています。この記事では、Golang と RabbitMQ に基づくリアルタイム データ同期ソリューションを紹介し、具体的なコード例を示します。 1. RabbitMQ とは何ですか?ラビ

現在、マイクロサービス アーキテクチャ モデルを採用する企業が増えており、このアーキテクチャではメッセージ キューが重要な通信手段となっており、その中でも RabbitMQ が広く使用されています。 Go 言語では、go-zero は近年登場したフレームワークであり、開発者がメッセージ キューをより簡単に使用できるようにするための実用的なツールやメソッドが数多く提供されています。以下では、実際のアプリケーションに基づいて go-zero を紹介します。とRabbitMQの応用実践。 1.RabbitMQ の概要Rabbit

GolangRabbitMQ: 高可用性メッセージ キュー システムのアーキテクチャ設計と実装には、特定のコード サンプルが必要です はじめに: インターネット技術の継続的な開発とその広範な応用により、メッセージ キューは現代のソフトウェア システムに不可欠な部分になりました。メッセージ キューは、デカップリング、非同期通信、フォールト トレラント処理、その他の機能を実装するツールとして、分散システムに高可用性とスケーラビリティのサポートを提供します。 Golang は効率的で簡潔なプログラミング言語として、同時実行性とパフォーマンスの高いシステムを構築するために広く使用されています。
