Heim > Java > javaLernprogramm > Hauptteil

So verwenden Sie Java-Code zum Implementieren der RabbitMQ-Verzögerungswarteschlange

PHPz
Freigeben: 2023-05-12 23:55:04
nach vorne
971 Leute haben es durchsucht

    Einführung in die RabbitMQ-Verzögerungswarteschlange

    RabbitMQ-Verzögerungswarteschlange bedeutet, dass eine Nachricht, nachdem sie an die Warteschlange gesendet wurde, nicht sofort vom Verbraucher konsumiert wird, sondern eine gewisse Zeit wartet, bevor sie vom Verbraucher konsumiert wird . Diese Art von Warteschlange wird normalerweise verwendet, um geplante Aufgaben auszuführen. Wenn beispielsweise eine Bestellung abläuft und nicht bezahlt wird, storniert das System die Bestellung und gibt den belegten Bestand frei.

    Es gibt viele Möglichkeiten, Verzögerungswarteschlangen in RabbitMQ zu implementieren. Die gebräuchlichste davon ist die Verwendung von Plug-Ins oder die Implementierung über den DLX-Mechanismus (Dead Letter Exchange).

    Verwenden Sie Plug-Ins, um verzögerte Warteschlangen zu implementieren

    RabbitMQ stellt das Rabbitmq_delayed_message_exchange-Plug-In bereit, mit dem verzögerte Warteschlangen implementiert werden können. Das Prinzip dieses Plug-Ins besteht darin, die Nachricht beim Senden der Nachricht an eine bestimmte Börse zu senden. Anschließend leitet die Börse die Nachricht entsprechend der Verzögerungszeit in der Nachricht an die angegebene Warteschlange weiter und realisiert so die Funktion von Verzögerungswarteschlange.

    Um dieses Plug-in zu verwenden, müssen Sie zuerst das Plug-in installieren, dann einen Exchange erstellen, den Typ des Exchange auf x-delayed-message festlegen und dann den Exchange an die Warteschlange binden.

    Verwenden Sie den DLX-Mechanismus, um eine Verzögerungswarteschlange zu implementieren.

    Die TTL einer Nachricht ist die Überlebenszeit der Nachricht. RabbitMQ kann TTL für Warteschlangen bzw. Nachrichten festlegen. Die Warteschlangeneinstellung ist die Aufbewahrungszeit der Warteschlange ohne angeschlossene Verbraucher. Sie können auch separate Einstellungen für jede einzelne Nachricht festlegen. Nach dieser Zeit betrachten wir die Nachricht als tot und nennen sie einen toten Brief. Wenn die Warteschlange festgelegt ist und die Nachricht festgelegt ist, wird der kleinere Wert verwendet. Wenn eine Nachricht daher an verschiedene Warteschlangen weitergeleitet wird, kann die Todeszeit der Nachricht unterschiedlich sein (unterschiedliche Warteschlangeneinstellungen). Hier sprechen wir nur über die TTL einer einzelnen Nachricht, da sie der Schlüssel zum Erreichen verzögerter Aufgaben ist. Sie können die Zeit festlegen, indem Sie das Ablauffeld der Nachricht oder das Attribut x-message-ttl festlegen. Beide haben den gleichen Effekt.

    DLX-Mechanismus ist ein von RabbitMQ bereitgestellter Nachrichtenweiterleitungsmechanismus. Er kann Nachrichten, die nicht verarbeitet werden können, an die angegebene Börse weiterleiten und so eine verzögerte Verarbeitung von Nachrichten erreichen. Die spezifischen Implementierungsschritte lauten wie folgt:

    • Erstellen Sie einen gewöhnlichen Austausch und eine Warteschlange und binden Sie sie zusammen.

    • Erstellen Sie eine DLX-Börse und binden Sie eine normale Börse an die DLX-Börse.

    • Stellen Sie die Warteschlange so ein, dass sie ein TTL-Attribut (Time To Live) hat, und legen Sie die Ablaufzeit der Nachricht fest.

    • Warteschlange an DLX Exchange binden.

    Wenn die Nachricht abläuft, wird sie an DLX Exchange gesendet, und DLX Exchange leitet die Nachricht dann an die angegebene Börse weiter, wodurch die Verzögerungswarteschlangenfunktion realisiert wird.

    Der Vorteil der Verwendung des DLX-Mechanismus zur Implementierung einer Verzögerungswarteschlange besteht darin, dass keine zusätzlichen Plug-Ins installiert werden müssen, sondern die Ablaufzeit der Nachricht genau gesteuert werden muss, da sonst die Ablaufzeit der Nachricht möglicherweise ungenau ist .

    Legen Sie die Verzögerungswarteschlange in der Java-Sprache fest

    Im Folgenden sind die Schritte aufgeführt, um die Verzögerungswarteschlange über RabbitMQ in der Java-Sprache festzulegen:

    Installieren Sie das Plug-in

    Zuerst müssen Sie den rabbitmq_delayed_message_exchangeinstallieren > Plug-in. Es kann über den folgenden Befehl installiert werden:

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    Nach dem Login kopieren
    rabbitmq_delayed_message_exchange 插件。可以通过以下命令安装:

    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    channel.exchangeDeclare("delayed-exchange", "x-delayed-message", true, false, args);
    Nach dem Login kopieren

    创建延时交换机

    延时队列需要使用延时交换机。可以使用 x-delayed-message 类型创建一个延时交换机。以下是创建延时交换机的示例代码:

    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", "delayed-exchange");
    args.put("x-dead-letter-routing-key", "delayed-queue");
    args.put("x-message-ttl", 5000);
    channel.queueDeclare("delayed-queue", true, false, false, args);
    channel.queueBind("delayed-queue", "delayed-exchange", "delayed-queue");
    Nach dem Login kopieren

    创建延时队列

    创建延时队列时,需要将队列绑定到延时交换机上,并设置队列的 TTL(Time To Live)参数。以下是创建延时队列的示例代码:

    Map<String, Object> headers = new HashMap<>();
    headers.put("x-delay", 5000);
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
            .headers(headers)
            .expiration("5000")
            .build();
    channel.basicPublish("delayed-exchange", "delayed-queue", properties, "Hello, delayed queue!".getBytes());
    Nach dem Login kopieren

    在上述代码中,将队列绑定到延时交换机上,并设置了队列的 TTL 参数为 5000 毫秒,即消息在发送到队列后,如果在 5000 毫秒内没有被消费者消费,则会被转发到 delayed-exchange 交换机上,并发送到 delayed-queue 队列中。

    发送延时消息

    发送延时消息时,需要设置消息的 expiration 属性,该属性表示消息的过期时间。以下是发送延时消息的示例代码:

    channel.basicQos(1);
    channel.basicConsume("delayed-queue", false, (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        System.out.println("Received message: " + message);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    });
    Nach dem Login kopieren

    在上述代码中,设置了消息的 expiration 属性为 5000 毫秒,并将消息发送到 delayed-exchange 交换机上,路由键为 delayed-queue,消息内容为 “Hello, delayed queue!”。

    消费延时消息

    消费延时消息时,需要设置消费者的 QOS(Quality of Service)参数,以控制消费者的并发处理能力。以下是消费延时消息的示例代码:

    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    public class DelayedQueueExample {
        private static final String EXCHANGE_NAME = "delayed_exchange";
        private static final String QUEUE_NAME = "delayed_queue";
        private static final String ROUTING_KEY = "delayed_routing_key";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            /*
             Exchange.DeclareOk exchangeDeclare(String exchange,
                                                  String type,
                                                  boolean durable,
                                                  boolean autoDelete,
                                                  boolean internal,
                                                  Map<String, Object> arguments) throws IOException;
                                                  */
            // 创建一个支持延时队列的Exchange
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-delayed-type", "direct");
            channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    
            // 创建一个延时队列,设置x-dead-letter-exchange和x-dead-letter-routing-key参数
            Map<String, Object> queueArguments = new HashMap<>();
            queueArguments.put("x-dead-letter-exchange", "");
            queueArguments.put("x-dead-letter-routing-key", QUEUE_NAME);
            queueArguments.put("x-message-ttl", 5000);
            channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
    
            // 发送消息到延时队列中,设置expiration参数
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .expiration("10000")
                    .build();
            String message = "Hello, delayed queue!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
            System.out.println("Sent message to delayed queue: " + message);
            channel.close();
            connection.close();
        }
    }
    Nach dem Login kopieren
    Nach dem Login kopieren

    在上述代码中,设置了 QOS 参数为 1,即每次只处理一个消息。然后使用 basicConsume 方法消费 delayed-queue 队列中的消息,并在消费完成后,使用 basicAckVerzögerungsschalter erstellen

    Verzögerungswarteschlangen erfordern die Verwendung eines Verzögerungsschalters. Mit dem Typ x-delayed-message kann ein verzögerter Switch erstellt werden. Das Folgende ist ein Beispielcode zum Erstellen eines Verzögerungsschalters:

    rrreee🎜Eine Verzögerungswarteschlange erstellen🎜🎜Beim Erstellen einer Verzögerungswarteschlange müssen Sie die Warteschlange an den Verzögerungsschalter binden und den TTL-Parameter (Time To Live) der Warteschlange festlegen . Das Folgende ist ein Beispielcode zum Erstellen einer Verzögerungswarteschlange: 🎜rrreee🎜Im obigen Code ist die Warteschlange an den Verzögerungsschalter gebunden und der TTL-Parameter der Warteschlange ist auf 5000 Millisekunden eingestellt, dh nach dem Senden der Nachricht an die Warteschlange, wenn es innerhalb von 5000 liegt. Wenn es nicht innerhalb von Millisekunden vom Verbraucher verbraucht wird, wird es an den delayed-exchange-Switch weitergeleitet und an die delayed-queue</code gesendet > Warteschlange. 🎜🎜Eine verzögerte Nachricht senden🎜🎜Wenn Sie eine verzögerte Nachricht senden, müssen Sie das Attribut <code>expiration der Nachricht festlegen, das die Ablaufzeit der Nachricht angibt. Das Folgende ist ein Beispielcode zum Senden einer verzögerten Nachricht: 🎜rrreee🎜Im obigen Code wird die Eigenschaft expiration der Nachricht auf 5000 Millisekunden gesetzt und die Nachricht wird an delayed-exchange gesendet Auf dem Switch ist der Routing-Schlüssel delayed-queue und der Nachrichteninhalt ist „Hallo, verzögerte Warteschlange!“. 🎜🎜Verzögerte Nachrichten konsumieren🎜🎜Beim Konsumieren verzögerter Nachrichten müssen Sie die QOS-Parameter (Quality of Service) des Verbrauchers festlegen, um die gleichzeitigen Verarbeitungsfähigkeiten des Verbrauchers zu steuern. Das Folgende ist ein Beispielcode für die Verarbeitung verzögerter Nachrichten: 🎜rrreee🎜Im obigen Code ist der QOS-Parameter auf 1 gesetzt, d. h. es wird jeweils nur eine Nachricht verarbeitet. Verwenden Sie dann die Methode basicConsume, um die Nachricht in der Warteschlange delayed-queue zu konsumieren, und verwenden Sie nach Abschluss des Konsums die Methode basicAck zur Bestätigung dass die Nachricht verbraucht wurde. 🎜🎜Durch die oben genannten Schritte können Sie die RabbitMQ-Verzögerungswarteschlange implementieren, die zum Implementieren von Funktionen wie geplanten Aufgaben verwendet wird. 🎜

    RabbitMQ延时队列是一种常见的消息队列应用场景,它可以在消息发送后指定一定的时间后才能被消费者消费,通常用于实现一些延时任务,例如订单超时未支付自动取消等。

    RabbitMQ延时队列具体代码

    下面是具体代码(附注释):

    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    public class DelayedQueueExample {
        private static final String EXCHANGE_NAME = "delayed_exchange";
        private static final String QUEUE_NAME = "delayed_queue";
        private static final String ROUTING_KEY = "delayed_routing_key";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            /*
             Exchange.DeclareOk exchangeDeclare(String exchange,
                                                  String type,
                                                  boolean durable,
                                                  boolean autoDelete,
                                                  boolean internal,
                                                  Map<String, Object> arguments) throws IOException;
                                                  */
            // 创建一个支持延时队列的Exchange
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-delayed-type", "direct");
            channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    
            // 创建一个延时队列,设置x-dead-letter-exchange和x-dead-letter-routing-key参数
            Map<String, Object> queueArguments = new HashMap<>();
            queueArguments.put("x-dead-letter-exchange", "");
            queueArguments.put("x-dead-letter-routing-key", QUEUE_NAME);
            queueArguments.put("x-message-ttl", 5000);
            channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
    
            // 发送消息到延时队列中,设置expiration参数
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .expiration("10000")
                    .build();
            String message = "Hello, delayed queue!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
            System.out.println("Sent message to delayed queue: " + message);
            channel.close();
            connection.close();
        }
    }
    Nach dem Login kopieren
    Nach dem Login kopieren

    在上面的代码中,我们创建了一个支持延时队列的Exchange,并创建了一个延时队列,设置了x-dead-letter-exchange和x-dead-letter-routing-key参数。然后,我们发送了一条消息到延时队列中,设置了expiration参数,表示这条消息延时10秒后才能被消费。

    注意,如果我们想要消费延时队列中的消息,需要创建一个消费者,并监听这个队列。当消息被消费时,需要发送ack确认消息已经被消费,否则消息会一直留在队列中。

    Das obige ist der detaillierte Inhalt vonSo verwenden Sie Java-Code zum Implementieren der RabbitMQ-Verzögerungswarteschlange. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Verwandte Etiketten:
    Quelle:yisu.com
    Erklärung dieser Website
    Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
    Beliebte Tutorials
    Mehr>
    Neueste Downloads
    Mehr>
    Web-Effekte
    Quellcode der Website
    Website-Materialien
    Frontend-Vorlage