Heim > Java > javaLernprogramm > Wie implementiert RocketMQ das Senden und Empfangen von Nachrichten in Springboot?

Wie implementiert RocketMQ das Senden und Empfangen von Nachrichten in Springboot?

WBOY
Freigeben: 2023-05-18 17:19:06
nach vorne
1771 Leute haben es durchsucht

springboot+rockermq realisiert das einfache Senden und Empfangen von Nachrichten

Es gibt drei Möglichkeiten, normale Nachrichten zu senden: Einwegversand, synchrones Senden und asynchrones Senden.

Im Folgenden wird die Integration von Springboot + RockerMQ vorgestellt, um das Senden und Empfangen normaler Nachrichten zu realisieren.

  • Erstellen Sie ein Springboot-Projekt und fügen Sie RockerMQ-Abhängigkeiten hinzu

    Server :
  • Port: 8083
    # Rocketmq konfigurieren
  • rocketmq:

    Nameserver: 127.0.0.1:9876

    #Produzent
  • Produzent:
#Produzentengruppenname, der in einer Anwendung eindeutig sein muss
Gruppe: Gruppe1

#Timeout für Nachrichtenversand Die Standardzeit beträgt 3000 ms
send-message-timeout: 3000
#Wenn die Nachricht 4096 Bytes erreicht, wird die Nachricht komprimiert. Standard 4096

compress-message-body-threshold: 4096

#Maximales Nachrichtenlimit, Standard ist 128K
max-message-size: 4194304
#Anzahl der Wiederholungsversuche für fehlgeschlagenes Senden der Synchronisierungsnachricht
retry-times-when-send-failed: 3
#Ob andere Agenten erneut versucht werden sollen, wenn das interne Senden fehlschlägt. Dieser Parameter ist nur wirksam, wenn mehrere Broker vorhanden sind. async-failed: 3




Erstellen Sie einen neuen Controller zum Senden von Nachrichten:


<!--rocketMq依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>
Nach dem Login kopieren

SpringBoot stellt uns die Vorlagenklasse RocketMQTemplate zur Verfügung, mit der wir Nachrichten in verschiedenen Formen senden können.

Die Sendemethode gibt das Thema Thema Test-Thema an.


Erstellen Sie einen neuen Nachrichtenkonsumenten, um RocketMQConsumerListener abzuhören, auf Nachrichten zu warten und Nachrichten zu konsumieren
  • package com.example.springbootrocketdemo.controller;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    /**
     * 普通信息的三种方式:同步、异步、单向
     * @author qzz
     */
    @RestController
    public class RocketMQCOntroller {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        /**
         * 发送普通消息
         * convertAndSend(String destination, Object payload) 发送字符串比较方便
         */
        @RequestMapping("/send")
        public void send(){
            rocketMQTemplate.convertAndSend("test-topic","test-message");
        }
        /**
         * 发送同步消息
         */
        @RequestMapping("/testSyncSend")
        public void testSyncSend(){
            //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
            //参数二:消息内容
            SendResult sendResult = rocketMQTemplate.syncSend("test-topic","同步消息测试");
            System.out.println(sendResult);
        }
        /**
         * 发送异步消息
         */
        @RequestMapping("/testASyncSend")
        public void testASyncSend(){
            //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
            //参数二:消息内容
            //参数三:回调
            rocketMQTemplate.asyncSend("test-topic", "异步消息测试", new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
                @Override
                public void onException(Throwable throwable) {
                    System.out.println("消息发送异常");
                    throwable.printStackTrace();
                }
            });
        }
        /**
         * 发送单向消息
         */
        @RequestMapping("/testOneWay")
        public void testOneWay(){
            //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
            //参数二:消息内容
            rocketMQTemplate.sendOneWay("test-topic","单向消息测试");
        }
    }
    Nach dem Login kopieren

    Die Konsumentenklasse muss die RocketMQListener-Schnittstelle implementieren und den Nachrichtentyp String dynamisch angeben.

    Fügen Sie der Klasse die @RocketMQMessageListener-Annotation hinzu, geben Sie das Thema Topic Test-Topic und den Verbrauchergruppentest an

    Das einfache Senden und Empfangen von Nachrichten ist abgeschlossen!

      Starten Sie den Dienst und testen Sie den Nachrichtenverbrauch Nachrichten:

    Das obige ist der detaillierte Inhalt vonWie implementiert RocketMQ das Senden und Empfangen von Nachrichten in Springboot?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:yisu.com
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage