> Java > java지도 시간 > Java 코드를 사용하여 RabbitMQ 지연 대기열을 구현하는 방법

Java 코드를 사용하여 RabbitMQ 지연 대기열을 구현하는 방법

PHPz
풀어 주다: 2023-05-12 23:55:04
앞으로
1008명이 탐색했습니다.

    RabbitMQ 지연 대기열 소개

    RabbitMQ 지연 대기열은 메시지가 대기열로 전송된 후 소비자가 즉시 소비하지 않고 소비자가 소비하기 전에 일정 시간 동안 기다리는 것을 의미합니다 . 이러한 종류의 대기열은 일반적으로 예약된 작업을 구현하는 데 사용됩니다. 예를 들어 주문 시간이 초과되어 지불되지 않으면 시스템은 주문을 취소하고 점유된 재고를 해제합니다.

    RabbitMQ에서 지연 대기열을 구현하는 방법은 여러 가지가 있으며, 가장 일반적인 방법은 플러그인을 사용하거나 DLX(Dead Letter Exchange) 메커니즘을 통해 구현하는 것입니다.

    플러그인을 사용하여 지연된 대기열 구현

    RabbitMQ는 지연된 대기열을 구현하는 데 사용할 수 있는 Rabbitmq_delayed_message_exchange 플러그인을 제공합니다. 이 플러그인의 원리는 메시지가 전송될 때 특정 Exchange로 메시지를 보내고, Exchange는 메시지의 지연 시간에 따라 지정된 대기열로 메시지를 전달하는 것입니다. 지연 대기열.

    이 플러그인을 사용하려면 먼저 플러그인을 설치한 다음 Exchange를 생성하고 Exchange 유형을 x-delayed-message로 설정한 다음 Exchange를 대기열에 바인딩해야 합니다.

    DLX 메커니즘을 사용하여 지연 대기열 구현

    메시지의 TTL은 메시지의 생존 시간입니다. RabbitMQ는 큐와 메시지에 대해 각각 TTL을 설정할 수 있습니다. 대기열 설정은 소비자가 연결되지 않은 상태에서 대기열이 유지되는 시간이며, 개별 메시지별로 별도의 설정도 가능합니다. 이 시간이 지나면 우리는 메시지를 죽은 것으로 간주하고 죽은 편지라고 부릅니다. 큐가 설정되고 메시지가 설정되면 더 작은 값이 사용됩니다. 따라서 메시지가 다른 대기열로 라우팅되는 경우 메시지의 종료 시간이 다를 수 있습니다(다른 대기열 설정). 여기서는 단일 메시지의 TTL에 대해서만 설명합니다. 왜냐하면 이것이 지연된 작업을 달성하는 데 핵심이기 때문입니다. 메시지의 만료 필드 또는 x-message-ttl 속성을 설정하여 시간을 설정할 수 있습니다. 둘 다 동일한 효과를 가집니다.

    DLX 메커니즘은 RabbitMQ에서 제공하는 메시지 전달 메커니즘으로, 처리할 수 없는 메시지를 지정된 Exchange로 전달하여 메시지 처리 지연을 달성할 수 있습니다. 구체적인 구현 단계는 다음과 같습니다.

    • 일반 Exchange 및 대기열을 생성하고 함께 바인딩합니다.

    • DLX Exchange를 생성하고 일반 Exchange를 DLX Exchange에 바인딩합니다.

    • 대기열에 TTL(Time To Live) 속성이 포함되도록 설정하고 메시지 만료 시간을 설정하세요.

    • 큐를 DLX Exchange에 바인딩합니다.

    메시지가 만료되면 DLX Exchange로 전송되고, DLX Exchange는 해당 메시지를 지정된 Exchange로 전달하여 지연 대기열 기능을 구현합니다.

    지연 대기열을 구현하기 위해 DLX 메커니즘을 사용하면 추가 플러그인을 설치할 필요가 없지만 메시지 만료 시간을 정확하게 제어해야 한다는 장점이 있습니다. 그렇지 않으면 메시지 만료 시간이 부정확할 수 있습니다. .

    Java 언어로 지연 대기열 설정

    다음은 Java 언어를 사용하여 RabbitMQ를 통해 지연 대기열을 설정하는 단계입니다.

    플러그인 설치

    먼저 rabbitmq_delayed_message_exchange를 설치해야 합니다. > 플러그인. 다음 명령을 통해 설치할 수 있습니다: rabbitmq_delayed_message_exchange 插件。可以通过以下命令安装:

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    로그인 후 복사

    创建延时交换机

    延时队列需要使用延时交换机。可以使用 x-delayed-message 类型创建一个延时交换机。以下是创建延时交换机的示例代码:

    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    channel.exchangeDeclare("delayed-exchange", "x-delayed-message", true, false, args);
    로그인 후 복사

    创建延时队列

    创建延时队列时,需要将队列绑定到延时交换机上,并设置队列的 TTL(Time To Live)参数。以下是创建延时队列的示例代码:

    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", "delayed-exchange");
    args.put("x-dead-letter-routing-key", "delayed-queue");
    args.put("x-message-ttl", 5000);
    channel.queueDeclare("delayed-queue", true, false, false, args);
    channel.queueBind("delayed-queue", "delayed-exchange", "delayed-queue");
    로그인 후 복사

    在上述代码中,将队列绑定到延时交换机上,并设置了队列的 TTL 参数为 5000 毫秒,即消息在发送到队列后,如果在 5000 毫秒内没有被消费者消费,则会被转发到 delayed-exchange 交换机上,并发送到 delayed-queue 队列中。

    发送延时消息

    发送延时消息时,需要设置消息的 expiration 属性,该属性表示消息的过期时间。以下是发送延时消息的示例代码:

    Map<String, Object> headers = new HashMap<>();
    headers.put("x-delay", 5000);
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
            .headers(headers)
            .expiration("5000")
            .build();
    channel.basicPublish("delayed-exchange", "delayed-queue", properties, "Hello, delayed queue!".getBytes());
    로그인 후 복사

    在上述代码中,设置了消息的 expiration 属性为 5000 毫秒,并将消息发送到 delayed-exchange 交换机上,路由键为 delayed-queue,消息内容为 “Hello, delayed queue!”。

    消费延时消息

    消费延时消息时,需要设置消费者的 QOS(Quality of Service)参数,以控制消费者的并发处理能力。以下是消费延时消息的示例代码:

    channel.basicQos(1);
    channel.basicConsume("delayed-queue", false, (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        System.out.println("Received message: " + message);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    });
    로그인 후 복사

    在上述代码中,设置了 QOS 参数为 1,即每次只处理一个消息。然后使用 basicConsume 方法消费 delayed-queue 队列中的消息,并在消费完成后,使用 basicAck

    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    public class DelayedQueueExample {
        private static final String EXCHANGE_NAME = "delayed_exchange";
        private static final String QUEUE_NAME = "delayed_queue";
        private static final String ROUTING_KEY = "delayed_routing_key";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            /*
             Exchange.DeclareOk exchangeDeclare(String exchange,
                                                  String type,
                                                  boolean durable,
                                                  boolean autoDelete,
                                                  boolean internal,
                                                  Map<String, Object> arguments) throws IOException;
                                                  */
            // 创建一个支持延时队列的Exchange
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-delayed-type", "direct");
            channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    
            // 创建一个延时队列,设置x-dead-letter-exchange和x-dead-letter-routing-key参数
            Map<String, Object> queueArguments = new HashMap<>();
            queueArguments.put("x-dead-letter-exchange", "");
            queueArguments.put("x-dead-letter-routing-key", QUEUE_NAME);
            queueArguments.put("x-message-ttl", 5000);
            channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
    
            // 发送消息到延时队列中,设置expiration参数
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .expiration("10000")
                    .build();
            String message = "Hello, delayed queue!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
            System.out.println("Sent message to delayed queue: " + message);
            channel.close();
            connection.close();
        }
    }
    로그인 후 복사
    로그인 후 복사

    지연 스위치 만들기

    지연 대기열에는 지연 스위치를 사용해야 합니다. 지연된 스위치는 x-delayed-message 유형을 사용하여 생성할 수 있습니다. 다음은 지연 스위치 생성을 위한 샘플 코드입니다. 🎜rrreee🎜지연 대기열 생성🎜🎜지연 대기열 생성 시 대기열을 지연 스위치에 바인딩하고 대기열의 TTL(Time To Live) 매개변수를 설정해야 합니다. . 다음은 지연 대기열을 생성하는 샘플 코드입니다. 🎜rrreee🎜위 코드에서 대기열은 지연 스위치에 바인딩되어 있으며 대기열의 TTL 매개 변수는 메시지가 전송된 후인 5000밀리초로 설정됩니다. 5000 이내인 경우 대기열에 밀리초 이내에 소비자가 소비하지 않으면 delayed-exchange 스위치로 전달되고 delayed-queue</code로 전송됩니다. > 대기열. 🎜🎜지연된 메시지 보내기🎜🎜지연된 메시지를 보낼 때 메시지의 만료 시간을 나타내는 메시지의 <code>expiration 속성을 ​​설정해야 합니다. 다음은 지연된 메시지를 보내는 샘플 코드입니다. 🎜rrreee🎜위 코드에서 메시지의 expiration 속성은 5000밀리초로 설정되고 메시지는 delayed-exchange로 전송됩니다. 스위치에서 라우팅 키는 delayed-queue이고 메시지 내용은 "Hello, Delayed queue!"입니다. 🎜🎜지연된 메시지 소비🎜🎜지연된 메시지를 소비하는 경우 소비자의 QOS(서비스 품질) 매개변수를 설정하여 소비자의 동시 처리 기능을 제어해야 합니다. 다음은 지연된 메시지를 소비하기 위한 샘플 코드입니다. 🎜rrreee🎜위 코드에서 QOS 매개변수는 1로 설정됩니다. 즉, 한 번에 하나의 메시지만 처리됩니다. 그런 다음 basicConsume 메서드를 사용하여 delayed-queue 대기열의 메시지를 소비하고, 소비가 완료된 후 basicAck 메서드를 사용하여 확인합니다. 메시지가 소비되었다는 것입니다. 🎜🎜위 단계를 통해 예약된 작업 등의 기능을 구현하는 데 사용되는 RabbitMQ 지연 대기열을 구현할 수 있습니다. 🎜

    RabbitMQ延时队列是一种常见的消息队列应用场景,它可以在消息发送后指定一定的时间后才能被消费者消费,通常用于实现一些延时任务,例如订单超时未支付自动取消等。

    RabbitMQ延时队列具体代码

    下面是具体代码(附注释):

    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    public class DelayedQueueExample {
        private static final String EXCHANGE_NAME = "delayed_exchange";
        private static final String QUEUE_NAME = "delayed_queue";
        private static final String ROUTING_KEY = "delayed_routing_key";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            /*
             Exchange.DeclareOk exchangeDeclare(String exchange,
                                                  String type,
                                                  boolean durable,
                                                  boolean autoDelete,
                                                  boolean internal,
                                                  Map<String, Object> arguments) throws IOException;
                                                  */
            // 创建一个支持延时队列的Exchange
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-delayed-type", "direct");
            channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    
            // 创建一个延时队列,设置x-dead-letter-exchange和x-dead-letter-routing-key参数
            Map<String, Object> queueArguments = new HashMap<>();
            queueArguments.put("x-dead-letter-exchange", "");
            queueArguments.put("x-dead-letter-routing-key", QUEUE_NAME);
            queueArguments.put("x-message-ttl", 5000);
            channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
    
            // 发送消息到延时队列中,设置expiration参数
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .expiration("10000")
                    .build();
            String message = "Hello, delayed queue!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
            System.out.println("Sent message to delayed queue: " + message);
            channel.close();
            connection.close();
        }
    }
    로그인 후 복사
    로그인 후 복사

    在上面的代码中,我们创建了一个支持延时队列的Exchange,并创建了一个延时队列,设置了x-dead-letter-exchange和x-dead-letter-routing-key参数。然后,我们发送了一条消息到延时队列中,设置了expiration参数,表示这条消息延时10秒后才能被消费。

    注意,如果我们想要消费延时队列中的消息,需要创建一个消费者,并监听这个队列。当消息被消费时,需要发送ack确认消息已经被消费,否则消息会一直留在队列中。

    위 내용은 Java 코드를 사용하여 RabbitMQ 지연 대기열을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    관련 라벨:
    원천:yisu.com
    본 웹사이트의 성명
    본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
    인기 튜토리얼
    더>
    최신 다운로드
    더>
    웹 효과
    웹사이트 소스 코드
    웹사이트 자료
    프론트엔드 템플릿