How SpringBoot integrates RabbitMQ to implement delay queue
How to ensure that messages are not lost
rabbitmq message delivery path
Producer->Switch->Queue- >Consumer
In general, it is divided into three stages.
1. The producer ensures the reliability of message delivery.
2.mq internal messages are not lost.
3. Consumer consumption is successful.
What is message delivery reliability
To put it simply, the message is 100% sent to the message queue.
We can turn on confirmCallback
After the producer delivers the message, mq will give the producer an ack. Based on the ack, the producer can confirm whether the message is sent to mq.
Open confirmCallback
Modify the configuration file
#NONE:禁用发布确认模式,是默认值,CORRELATED:发布消息成功到交换器后会触发回调方法 spring: rabbitmq: publisher-confirm-type: correlated
Test code
@Test public void testConfirmCallback() throws InterruptedException { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 配置 * @param ack 交换机是否收到消息,true是成功,false是失败 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm=====>"); System.out.println("confirm==== ack="+ack); System.out.println("confirm==== cause="+cause); //根据ACK状态做对应的消息更新操作 TODO } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "鸡你太美"); Thread.sleep(10000); }
Use returnCallback to ensure that the message is successfully sent from the exchanger to the queue. Modify the configuration file
spring: rabbitmq: #开启returnCallback publisher-returns: true #交换机处理消息到路由失败,则会返回给生产者 template: mandatory: true
Test code
@Test void testReturnCallback() { //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定 rabbitTemplate.setMandatory(true); //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息 rabbitTemplate.setReturnsCallback(returned -> { int code = returned.getReplyCode(); System.out.println("code="+code); System.out.println("returned="+ returned); }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","测试returnCallback"); }
When consumers consume messages, they need to manually confirm that the messages have been consumed through ack.
Modify the configuration file
spring: rabbitmq: listener: simple: acknowledge-mode: manual
Write test code
@RabbitHandler public void consumer(String body, Message message, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("message="+ message); System.out.println("body="+body); //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除 channel.basicAck(msgTag,false); // channel.basicNack(msgTag,false,true); }
deliveryTags is the message delivery sequence number. Every time a message is consumed or the message is re-delivered, the deliveryTag will be increased
ttlDead letter queue
What is a dead letter queue
A queue where messages that are not consumed in time are stored
What are the circumstances under which a message becomes a dead letter
Consumer rejects message(basic.reject/basic.nack) and does not requeue requeue=false
The message has not been consumed in the queue and has exceeded the expiration time of the queue or the message itselfTTL (time-to-live)
of the queue The message length reaches the limit
Result: After the message becomes a dead letter, if the queue is bound to a dead letter switch, the message will be rerouted to the dead letter queue by the dead letter switch
Dead letter queues are often used for delayed queue consumption.
Delay queue
The producer does not expect this message to be consumed immediately when it is delivered to mq, but waits for a period of time before consuming it.
springboot integrates rabbitmq to realize automatic shutdown of orders when timeout
package com.fandf.test.rabbit; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author fandongfeng * @date 2023/4/15 15:38 */ @Configuration public class RabbitMQConfig { /** * 订单交换机 */ public static final String ORDER_EXCHANGE = "order_exchange"; /** * 订单队列 */ public static final String ORDER_QUEUE = "order_queue"; /** * 订单路由key */ public static final String ORDER_QUEUE_ROUTING_KEY = "order.#"; /** * 死信交换机 */ public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange"; /** * 死信队列 routingKey */ public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key"; /** * 死信队列 */ public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue"; /** * 创建死信交换机 */ @Bean("orderDeadLetterExchange") public Exchange orderDeadLetterExchange() { return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false); } /** * 创建死信队列 */ @Bean("orderDeadLetterQueue") public Queue orderDeadLetterQueue() { return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build(); } /** * 绑定死信交换机和死信队列 */ @Bean("orderDeadLetterBinding") public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs(); } /** * 创建订单交换机 */ @Bean("orderExchange") public Exchange orderExchange() { return new TopicExchange(ORDER_EXCHANGE, true, false); } /** * 创建订单队列 */ @Bean("orderQueue") public Queue orderQueue() { Map<String, Object> args = new HashMap<>(3); //消息过期后,进入到死信交换机 args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE); //消息过期后,进入到死信交换机的路由key args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY); //过期时间,单位毫秒 args.put("x-message-ttl", 10000); return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build(); } /** * 绑定订单交换机和队列 */ @Bean("orderBinding") public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs(); } }
Consumer
package com.fandf.test.rabbit; import cn.hutool.core.date.DateUtil; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author fandongfeng * @date 2023/4/15 15:42 */ @Component @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE) public class OrderMQListener { @RabbitHandler public void consumer(String body, Message message, Channel channel) throws IOException { System.out.println("收到消息:" + DateUtil.now()); long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag=" + msgTag); System.out.println("message=" + message); System.out.println("body=" + body); channel.basicAck(msgTag, false); } }
Test class
@Test void testOrder() throws InterruptedException { //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定 rabbitTemplate.setMandatory(true); //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息 rabbitTemplate.setReturnsCallback(returned -> { int code = returned.getReplyCode(); System.out.println("code=" + code); System.out.println("returned=" + returned); }); rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "测试订单延迟"); System.out.println("发送消息:" + DateUtil.now()); Thread.sleep(20000); }
Program output
Sent message: 2023-04-16 15:14:34
Received message: 2023-04-16 15:14:44
msgTag=1
message=(Body:'Test order delay' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x-death=[{reason=expired, count=1, exchange=order_exchange, time=Mon Apr 16 15 :14:44 CST 2023, routing-keys=[order], queue=order_queue}], x-first-death-reason=expired, x-first-death-queue=order_queue}, contentType=text/plain, contentEncoding= UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order_dead_letter_exchange, receivedRoutingKey=order_dead_letter_queue_routing_key, deliveryTag=1, consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ, consumerQueue=order_dead_letter_queue] )
body= Test order delay
The above is the detailed content of How SpringBoot integrates RabbitMQ to implement delay queue. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics



How to build a reliable messaging application with React and RabbitMQ Introduction: Modern applications need to support reliable messaging to achieve features such as real-time updates and data synchronization. React is a popular JavaScript library for building user interfaces, while RabbitMQ is a reliable messaging middleware. This article will introduce how to combine React and RabbitMQ to build a reliable messaging application, and provide specific code examples. RabbitMQ overview:

How to use RabbitMQ to implement distributed message processing in PHP Introduction: In large-scale application development, distributed systems have become a common requirement. Distributed message processing is a pattern that improves the efficiency and reliability of the system by distributing tasks to multiple processing nodes. RabbitMQ is an open source, reliable message queuing system that uses the AMQP protocol to implement message delivery and processing. In this article we will cover how to use RabbitMQ in PHP for distribution

SpringBoot and SpringMVC are both commonly used frameworks in Java development, but there are some obvious differences between them. This article will explore the features and uses of these two frameworks and compare their differences. First, let's learn about SpringBoot. SpringBoot was developed by the Pivotal team to simplify the creation and deployment of applications based on the Spring framework. It provides a fast, lightweight way to build stand-alone, executable

As modern applications increase in complexity, messaging has become a powerful tool. In this area, RabbitMQ has become a very popular message broker that can be used to deliver messages between different applications. In this article, we will explore how to use RabbitMQ in Go language. This guide will cover the following: Introduction to RabbitMQ RabbitMQ Installation RabbitMQ Basic Concepts Getting Started with RabbitMQ in Go RabbitMQ and Go

This article will write a detailed example to talk about the actual development of dubbo+nacos+Spring Boot. This article will not cover too much theoretical knowledge, but will write the simplest example to illustrate how dubbo can be integrated with nacos to quickly build a development environment.

Introduction to the solution for real-time data synchronization between Golang and RabbitMQ: In today's era, with the popularity of the Internet and the explosive growth of data volume, real-time data synchronization has become more and more important. In order to solve the problems of asynchronous data transmission and data synchronization, many companies have begun to use message queues to achieve real-time synchronization of data. This article will introduce a real-time data synchronization solution based on Golang and RabbitMQ, and provide specific code examples. 1. What is RabbitMQ? Rabbi

Now more and more companies are beginning to adopt the microservice architecture model, and in this architecture, message queues have become an important communication method, among which RabbitMQ is widely used. In the Go language, go-zero is a framework that has emerged in recent years. It provides many practical tools and methods to allow developers to use message queues more easily. Below we will introduce go-zero based on practical applications. And the usage and application practice of RabbitMQ. 1.RabbitMQ OverviewRabbit

GolangRabbitMQ: The architectural design and implementation of a highly available message queue system requires specific code examples. Introduction: With the continuous development of Internet technology and its wide application, message queues have become an indispensable part of modern software systems. As a tool to implement decoupling, asynchronous communication, fault-tolerant processing and other functions, message queue provides high availability and scalability support for distributed systems. As an efficient and concise programming language, Golang is widely used to build high-concurrency and high-performance systems.
