Home > Java > javaTutorial > How SpringBoot integrates RabbitMQ to handle dead letter queues and delay queues

How SpringBoot integrates RabbitMQ to handle dead letter queues and delay queues

王林
Release: 2023-05-15 15:28:06
forward
919 people have browsed it

Introduction

RabbitMQ message introduction

RabbitMQ messages will not time out by default.

What is a dead letter queue? What is a delay queue?

Dead letter queue:

DLX, the full name is Dead-Letter-Exchange, can be called a dead letter exchanger, and some people call it a dead letter mailbox. When a message becomes a dead letter in a queue, it can be resent to another exchange. This exchange is DLX. The queue bound to DLX is called a dead letter queue.

The following situations will cause the message to become a dead letter:

  • The message is rejected (Basic.Reject/Basic.Nack), and the requeue parameter is set to false;

  • The message expired;

  • The queue reached the maximum length.

Delay queue:

The delay queue is used to store delayed messages. Delayed message: means that after the message is sent, the consumer does not want the consumer to get the message immediately, but waits for a specific period of time before the consumer can get the message for consumption.

Related URL

Detailed explanation of the use of dead letter queue and delay queue in RabbitMQ

Instance code

Routing configuration

package com.example.config;
 
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitRouterConfig {
    public static final String EXCHANGE_TOPIC_WELCOME   = "Exchange@topic.welcome";
    public static final String EXCHANGE_FANOUT_UNROUTE  = "Exchange@fanout.unroute";
    public static final String EXCHANGE_TOPIC_DELAY     = "Exchange@topic.delay";
 
    public static final String ROUTINGKEY_HELLOS        = "hello.#";
    public static final String ROUTINGKEY_DELAY         = "delay.#";
 
    public static final String QUEUE_HELLO              = "Queue@hello";
    public static final String QUEUE_HI                 = "Queue@hi";
    public static final String QUEUE_UNROUTE            = "Queue@unroute";
    public static final String QUEUE_DELAY              = "Queue@delay";
 
    public static final Integer TTL_QUEUE_MESSAGE       = 5000;
 
    @Autowired
    AmqpAdmin amqpAdmin;
 
    @Bean
    Object initBindingTest() {
        amqpAdmin.declareExchange(ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT_UNROUTE).durable(true).autoDelete().build());
        amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_DELAY).durable(true).autoDelete().build());
        amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME)
                .durable(true)
                .autoDelete()
                .withArgument("alternate-exchange", EXCHANGE_FANOUT_UNROUTE)
 
                .build());
 
        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HI).build());
        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HELLO)
                .withArgument("x-dead-letter-exchange", EXCHANGE_TOPIC_DELAY)
                .withArgument("x-dead-letter-routing-key", ROUTINGKEY_DELAY)
                .withArgument("x-message-ttl", TTL_QUEUE_MESSAGE)
                .build());
        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_UNROUTE).build());
        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_DELAY).build());
 
        amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
                EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));
        amqpAdmin.declareBinding(new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE,
                EXCHANGE_FANOUT_UNROUTE, "", null));
        amqpAdmin.declareBinding(new Binding(QUEUE_DELAY, Binding.DestinationType.QUEUE,
                EXCHANGE_TOPIC_DELAY, ROUTINGKEY_DELAY, null));
 
        return new Object();
    }
}
Copy after login

Controller

package com.example.controller;
 
import com.example.config.RabbitRouterConfig;
import com.example.mq.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
 
import java.time.LocalDateTime;
 
@RestController
public class HelloController {
    @Autowired
    private Sender sender;
 
    @PostMapping("/hi")
    public void hi() {
        sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now());
    }
 
    @PostMapping("/hello1")
    public void hello1() {
        sender.send("hello.a", "hello1 message:" + LocalDateTime.now());
    }
 
    @PostMapping("/hello2")
    public void hello2() {
        sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b", "hello2 message:" + LocalDateTime.now());
    }
 
    @PostMapping("/ae")
    public void aeTest() {
        sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono", "ae message:" + LocalDateTime.now());
    }
}
Copy after login

Sender

package com.example.mq;
 
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.util.Date;
 
@Component
public class Sender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
 
    public void send(String routingKey, String message) {
        this.rabbitTemplate.convertAndSend(routingKey, message);
    }
 
    public void send(String exchange, String routingKey, String message) {
        this.rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}
Copy after login

Receiver

package com.example.mq;
 
import com.example.config.RabbitRouterConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class Receiver {
    @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI)
    public void hi(String payload) {
        System.out.println ("Receiver(hi) : "  + payload);
    }
 
    // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)
    // public void hello(String hello) throws InterruptedException {
    //     System.out.println ("Receiver(hello) : "  + hello);
    //     Thread.sleep(5 * 1000);
    //     System.out.println("(hello):sleep over");
    // }
    //
    // @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE)
    // public void unroute(String hello) throws InterruptedException {
    //     System.out.println ("Receiver(unroute) : "  + hello);
    //     Thread.sleep(5 * 1000);
    //     System.out.println("(unroute):sleep over");
    // }
 
    @RabbitListener(queues = RabbitRouterConfig.QUEUE_DELAY)
    public void delay(String hello) throws InterruptedException {
        System.out.println ("Receiver(delay) : "  + hello);
        Thread.sleep(5 * 1000);
        System.out.println("(delay):sleep over");
    }
}
Copy after login

application.yml

server:
#  port: 9100
  port: 9101
spring:
  application:
#    name: demo-rabbitmq-sender
    name: demo-rabbitmq-receiver
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456
#    virtualHost: /
    publisher-confirms: true
    publisher-returns: true
#    listener:
#      simple:
#        acknowledge-mode: manual
#      direct:
#        acknowledge-mode: manual
Copy after login

Instance test

Start the sender and receiver separately.

Access: http://localhost:9100/hello2

Output after five seconds:

Receiver(delay): hello2 message:2020-11- 27T09:30:51.548
(delay):sleep over

The above is the detailed content of How SpringBoot integrates RabbitMQ to handle dead letter queues and delay queues. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:yisu.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template