Maison > Java > javaDidacticiel > le corps du texte

Comment utiliser le code Java pour implémenter la file d'attente de retard RabbitMQ

PHPz
Libérer: 2023-05-12 23:55:04
avant
971 Les gens l'ont consulté

    Introduction à RabbitMQ Delay Queue

    RabbitMQ Delay Queue signifie qu'une fois qu'un message est envoyé à la file d'attente, il n'est pas immédiatement consommé par le consommateur, mais attend un certain temps avant d'être consommé par le consommateur . Ce type de file d'attente est généralement utilisé pour mettre en œuvre des tâches planifiées. Par exemple, si une commande expire et n'est pas payée, le système annule la commande et libère le stock occupé.

    Il existe de nombreuses façons d'implémenter des files d'attente dans RabbitMQ, la plus courante étant d'utiliser des plug-ins ou de l'implémenter via le mécanisme DLX (Dead Letter Exchange).

    Utilisez des plug-ins pour implémenter des files d'attente retardées

    RabbitMQ fournit le plug-in Rabbitmq_delayed_message_exchange, qui peut être utilisé pour implémenter des files d'attente retardées. Le principe de ce plug-in est d'envoyer le message à un Exchange spécifique lorsque le message est envoyé, puis Exchange transmettra le message à la file d'attente spécifiée en fonction du délai du message, réalisant ainsi la fonction du file d'attente retardée.

    Pour utiliser ce plug-in, vous devez d'abord installer le plug-in, puis créer un Exchange, définir le type d'Exchange sur x-delayed-message, puis lier Exchange à la file d'attente.

    Utilisez le mécanisme DLX pour implémenter la file d'attente différée

    Le TTL d'un message est le temps de survie du message. RabbitMQ peut définir respectivement la durée de vie des files d'attente et des messages. Le paramètre de file d'attente correspond au temps de rétention de la file d'attente sans consommateurs connectés, et vous pouvez également définir des paramètres distincts pour chaque message individuel. Passé ce délai, nous considérons le message comme mort et l'appelons une lettre morte. Si la file d'attente est définie et le message est défini, la valeur la plus petite sera utilisée. Par conséquent, si un message est acheminé vers différentes files d’attente, l’heure d’expiration du message peut être différente (paramètres de file d’attente différents). Ici, nous ne parlons que du TTL d'un seul message, car c'est la clé pour réaliser des tâches retardées. Vous pouvez définir l'heure en définissant le champ d'expiration du message ou l'attribut x-message-ttl. Les deux ont le même effet.

    Le mécanisme DLX est un mécanisme de transfert de messages fourni par RabbitMQ. Il peut transférer les messages qui ne peuvent pas être traités vers l'échange désigné, obtenant ainsi un traitement retardé des messages. Les étapes spécifiques de mise en œuvre sont les suivantes :

    • Créez un échange et une file d'attente ordinaires et liez-les ensemble.

    • Créez un échange DLX et liez un échange normal à l'échange DLX.

    • Définissez la file d'attente pour avoir un attribut TTL (Time To Live) et définissez le délai d'expiration du message.

    • Lier la file d'attente à DLX Exchange.

    Lorsque le message expire, il sera envoyé à DLX Exchange, puis DLX Exchange transmettra le message à l'échange désigné, réalisant ainsi la fonction de file d'attente différée.

    L'avantage d'utiliser le mécanisme DLX pour implémenter une file d'attente différée est qu'il n'est pas nécessaire d'installer des plug-ins supplémentaires, mais le délai d'expiration du message doit être contrôlé avec précision, sinon le délai d'expiration du message peut être inexact .

    Définissez la file d'attente différée en langage Java

    Voici les étapes pour définir la file d'attente différée via RabbitMQ en utilisant le langage Java :

    Installez le plug-in

    Tout d'abord, vous devez installer le rabbitmq_delayed_message_exchange plug-in. Il peut être installé via la commande suivante : rabbitmq_delayed_message_exchange 插件。可以通过以下命令安装:

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    Copier après la connexion

    创建延时交换机

    延时队列需要使用延时交换机。可以使用 x-delayed-message 类型创建一个延时交换机。以下是创建延时交换机的示例代码:

    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    channel.exchangeDeclare("delayed-exchange", "x-delayed-message", true, false, args);
    Copier après la connexion

    创建延时队列

    创建延时队列时,需要将队列绑定到延时交换机上,并设置队列的 TTL(Time To Live)参数。以下是创建延时队列的示例代码:

    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", "delayed-exchange");
    args.put("x-dead-letter-routing-key", "delayed-queue");
    args.put("x-message-ttl", 5000);
    channel.queueDeclare("delayed-queue", true, false, false, args);
    channel.queueBind("delayed-queue", "delayed-exchange", "delayed-queue");
    Copier après la connexion

    在上述代码中,将队列绑定到延时交换机上,并设置了队列的 TTL 参数为 5000 毫秒,即消息在发送到队列后,如果在 5000 毫秒内没有被消费者消费,则会被转发到 delayed-exchange 交换机上,并发送到 delayed-queue 队列中。

    发送延时消息

    发送延时消息时,需要设置消息的 expiration 属性,该属性表示消息的过期时间。以下是发送延时消息的示例代码:

    Map<String, Object> headers = new HashMap<>();
    headers.put("x-delay", 5000);
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
            .headers(headers)
            .expiration("5000")
            .build();
    channel.basicPublish("delayed-exchange", "delayed-queue", properties, "Hello, delayed queue!".getBytes());
    Copier après la connexion

    在上述代码中,设置了消息的 expiration 属性为 5000 毫秒,并将消息发送到 delayed-exchange 交换机上,路由键为 delayed-queue,消息内容为 “Hello, delayed queue!”。

    消费延时消息

    消费延时消息时,需要设置消费者的 QOS(Quality of Service)参数,以控制消费者的并发处理能力。以下是消费延时消息的示例代码:

    channel.basicQos(1);
    channel.basicConsume("delayed-queue", false, (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        System.out.println("Received message: " + message);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    });
    Copier après la connexion

    在上述代码中,设置了 QOS 参数为 1,即每次只处理一个消息。然后使用 basicConsume 方法消费 delayed-queue 队列中的消息,并在消费完成后,使用 basicAck

    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    public class DelayedQueueExample {
        private static final String EXCHANGE_NAME = "delayed_exchange";
        private static final String QUEUE_NAME = "delayed_queue";
        private static final String ROUTING_KEY = "delayed_routing_key";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            /*
             Exchange.DeclareOk exchangeDeclare(String exchange,
                                                  String type,
                                                  boolean durable,
                                                  boolean autoDelete,
                                                  boolean internal,
                                                  Map<String, Object> arguments) throws IOException;
                                                  */
            // 创建一个支持延时队列的Exchange
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-delayed-type", "direct");
            channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    
            // 创建一个延时队列,设置x-dead-letter-exchange和x-dead-letter-routing-key参数
            Map<String, Object> queueArguments = new HashMap<>();
            queueArguments.put("x-dead-letter-exchange", "");
            queueArguments.put("x-dead-letter-routing-key", QUEUE_NAME);
            queueArguments.put("x-message-ttl", 5000);
            channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
    
            // 发送消息到延时队列中,设置expiration参数
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .expiration("10000")
                    .build();
            String message = "Hello, delayed queue!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
            System.out.println("Sent message to delayed queue: " + message);
            channel.close();
            connection.close();
        }
    }
    Copier après la connexion
    Copier après la connexion

    Créer un commutateur de délai

    Les files d'attente différées nécessitent l'utilisation d'un commutateur de délai. Un commutateur retardé peut être créé en utilisant le type x-delayed-message. Voici un exemple de code pour créer un commutateur de retard : 🎜rrreee🎜Création d'une file d'attente de retard🎜🎜Lors de la création d'une file d'attente de retard, vous devez lier la file d'attente au commutateur de retard et définir le paramètre TTL (Time To Live) de la file d'attente. . Voici un exemple de code pour créer une file d'attente différée : 🎜rrreee🎜Dans le code ci-dessus, la file d'attente est liée au commutateur de délai et le paramètre TTL de la file d'attente est défini sur 5 000 millisecondes, c'est-à-dire après l'envoi du message. à la file d'attente, s'il est dans les 5 000. S'il n'est pas consommé par le consommateur dans les millisecondes, il sera transmis au commutateur delayed-exchange et envoyé à la delayed-queue</code > file d'attente. 🎜🎜Envoyer un message différé🎜🎜Lors de l'envoi d'un message différé, vous devez définir l'attribut <code>expiration du message, qui indique l'heure d'expiration du message. Voici un exemple de code pour envoyer un message retardé : 🎜rrreee🎜Dans le code ci-dessus, la propriété expiration du message est définie sur 5 000 millisecondes et le message est envoyé à delayed-exchange Sur le commutateur, la clé de routage est delayed-queue et le contenu du message est "Bonjour, file d'attente retardée !". 🎜🎜Consommation de messages retardés🎜🎜Lors de la consommation de messages retardés, vous devez définir les paramètres QOS (Qualité de service) du consommateur pour contrôler les capacités de traitement simultanées du consommateur. Voici un exemple de code pour consommer des messages retardés : 🎜rrreee🎜Dans le code ci-dessus, le paramètre QOS est défini sur 1, c'est-à-dire qu'un seul message est traité à la fois. Utilisez ensuite la méthode basicConsume pour consommer le message dans la file d'attente delayed-queue, et une fois la consommation terminée, utilisez la méthode basicAck pour confirmer que le message a été consommé. 🎜🎜Grâce aux étapes ci-dessus, vous pouvez implémenter la file d'attente de retard RabbitMQ, qui est utilisée pour implémenter des fonctions telles que les tâches planifiées. 🎜

    RabbitMQ延时队列是一种常见的消息队列应用场景,它可以在消息发送后指定一定的时间后才能被消费者消费,通常用于实现一些延时任务,例如订单超时未支付自动取消等。

    RabbitMQ延时队列具体代码

    下面是具体代码(附注释):

    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    public class DelayedQueueExample {
        private static final String EXCHANGE_NAME = "delayed_exchange";
        private static final String QUEUE_NAME = "delayed_queue";
        private static final String ROUTING_KEY = "delayed_routing_key";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            /*
             Exchange.DeclareOk exchangeDeclare(String exchange,
                                                  String type,
                                                  boolean durable,
                                                  boolean autoDelete,
                                                  boolean internal,
                                                  Map<String, Object> arguments) throws IOException;
                                                  */
            // 创建一个支持延时队列的Exchange
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-delayed-type", "direct");
            channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    
            // 创建一个延时队列,设置x-dead-letter-exchange和x-dead-letter-routing-key参数
            Map<String, Object> queueArguments = new HashMap<>();
            queueArguments.put("x-dead-letter-exchange", "");
            queueArguments.put("x-dead-letter-routing-key", QUEUE_NAME);
            queueArguments.put("x-message-ttl", 5000);
            channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
    
            // 发送消息到延时队列中,设置expiration参数
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .expiration("10000")
                    .build();
            String message = "Hello, delayed queue!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
            System.out.println("Sent message to delayed queue: " + message);
            channel.close();
            connection.close();
        }
    }
    Copier après la connexion
    Copier après la connexion

    在上面的代码中,我们创建了一个支持延时队列的Exchange,并创建了一个延时队列,设置了x-dead-letter-exchange和x-dead-letter-routing-key参数。然后,我们发送了一条消息到延时队列中,设置了expiration参数,表示这条消息延时10秒后才能被消费。

    注意,如果我们想要消费延时队列中的消息,需要创建一个消费者,并监听这个队列。当消息被消费时,需要发送ack确认消息已经被消费,否则消息会一直留在队列中。

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Étiquettes associées:
    source:yisu.com
    Déclaration de ce site Web
    Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
    Tutoriels populaires
    Plus>
    Derniers téléchargements
    Plus>
    effets Web
    Code source du site Web
    Matériel du site Web
    Modèle frontal