Heim > Java > javaLernprogramm > Hauptteil

Wie SpringBoot RocketMQ-Transaktionen, Broadcasts und sequentielle Nachrichten integriert

WBOY
Freigeben: 2023-05-18 10:04:05
nach vorne
1860 Leute haben es durchsucht

Umgebung: springboot2.3.9RELEASE + RocketMQ4.8.0

Abhängig von

<dependency>   <groupid>org.springframework.boot</groupid>     <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency>     <groupid>org.apache.rocketmq</groupid>     <artifactid>rocketmq-spring-boot-starter</artifactid>     <version>2.2.0</version> </dependency>
Nach dem Login kopieren

Konfigurationsdatei

server:   port: 8080 --- rocketmq:   nameServer: localhost:9876   producer:     group: demo-mq
Nach dem Login kopieren

Normale Nachricht

Senden

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String message) {   rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }
Nach dem Login kopieren

Empfangen

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("接收到消息:" + message) ;     }  }</string>
Nach dem Login kopieren

Sequentielle Nachricht

Senden

@Resource private RocketMQTemplate rocketMQTemplate ;  public void sendOrder(String topic, String message, String tags, int id) {     rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),              "order-" + id, new SendCallback() {                 @Override                 public void onSuccess(SendResult sendResult) {                     System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ;                 }                 @Override                 public void onException(Throwable e) {                     e.printStackTrace() ;                 }             }); }
Nach dem Login kopieren

Hier sind die Hashes, die der Schlüssel sendet Nachricht an In verschiedenen Warteschlangen gibt

@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group",      selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ;     }  }</string>
Nach dem Login kopieren

consumeMode = ConsumeMode.ORDERLY an, dass der Nachrichtenmodus der sequentielle Modus ist, eine Warteschlange und ein Thread.

Ergebnis

Wie SpringBoot RocketMQ-Transaktionen, Broadcasts und sequentielle Nachrichten integriert

Wenn „consumeMode“ = ConsumeMode.CONCURRENTLY, lautet das Ausführungsergebnis wie folgt:

Wie SpringBoot RocketMQ-Transaktionen, Broadcasts und sequentielle Nachrichten integriert

Cluster-/Broadcast-Nachrichtenmodus

Sender

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String topic, String message, String tags) {     rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }
Nach dem Login kopieren

Cluster-Nachricht. Modus

Consum ähm

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }</string>
Nach dem Login kopieren

message Model = MessageModel.CLUSTERING

Test

Starten Sie zwei Dienste, deren Ports 8080 und 8081 sind.

8080-Dienst Balancing

Broadcast-NachrichtenmodusWie SpringBoot RocketMQ-Transaktionen, Broadcasts und sequentielle Nachrichten integriert

Verbraucherseite

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) @Component public class ConsumerBroadListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }</string>
Nach dem Login kopieren
messageModel = MessageModel.BROADCASTING

Wie SpringBoot RocketMQ-Transaktionen, Broadcasts und sequentielle Nachrichten integriertTest

Starten Sie zwei Dienste mit den Ports 8080 und 8081

8080-Dienst

8081-Dienst

In-Cluster-Nachricht Im Modus empfing jeder Dienst separat dieselbe Nachricht.

TransaktionsnachrichtWie SpringBoot RocketMQ-Transaktionen, Broadcasts und sequentielle Nachrichten integriert

3 Status von RocketMQ-Transaktionen

TransactionStatus.CommitTransaction: Transaktionsnachricht festschreiben, der Verbraucher kann diese Nachricht konsumieren

Wie SpringBoot RocketMQ-Transaktionen, Broadcasts und sequentielle Nachrichten integriertTransactionStatus.RollbackTransaction: Rollback der Transaktion, was bedeutet, dass die Nachricht gelöscht und nicht zugelassen wird verzehrt werden.

TransactionStatus.Unknown: Zwischenstatus, der die Notwendigkeit darstellt, die Nachrichtenwarteschlange zu überprüfen, um den Status zu ermitteln.

RocketMQ implementiert Transaktionsnachrichten in zwei Hauptphasen: normaler Transaktionsversand und -übermittlung und Transaktionsinformationskompensationsprozess. Der Gesamtprozess ist:

Normale Transaktionsversand- und -übermittlungsphase

1 Der Produzent sendet eineinhalb Nachrichten an MQServer (Halbe Nachricht bezieht sich auf die Nachricht, die der Verbraucher vorübergehend nicht konsumieren kann)

2. Der Server antwortet auf das Ergebnis des Nachrichtenschreibens und die halbe Nachricht wird erfolgreich gesendet

3 Beginnt mit der Ausführung der lokalen Transaktion

4 oder Rollback entsprechend dem Ausführungsstatus des lokalen Transaktionsvorgangs

Kompensationsprozess der Transaktionsinformationen

1 Wenn MQServer den Ausführungsstatus der lokalen Transaktion längere Zeit nicht erhält, initiiert er eine Bestätigungsüberprüfungsoperationsanforderung an den Produzent

2. Nachdem der Produzent die Bestätigungsüberprüfungsanfrage erhalten hat, überprüfen Sie den Ausführungsstatus lokaler Transaktionen

3. Führen Sie Commit- oder Rollback-Vorgänge basierend auf den Prüfergebnissen aus

Die Kompensationsphase wird hauptsächlich zur Lösung des Problems von Zeitüberschreitungen oder Fehlern verwendet wenn Produzenten Commit- oder Rollback-Vorgänge senden.

Sender

@Resource private RocketMQTemplate rocketMQTemplate ;      public void sendTx(String topic, Long id, String tags) {     rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(             new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))).             setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(),              UUID.randomUUID().toString().replaceAll("-", "")) ; }
Nach dem Login kopieren

Producer entsprechender Listener

@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener {          @Resource     private BusinessService bs ;      @Override     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {         // 这里执行本地的事务操作,比如保存数据。         try {             // 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据             String id = (String) msg.getHeaders().get("BID") ;             Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ;             System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id) ;             bs.save(users, new UsersLog(users.getId(), id)) ;         } catch (Exception e) {             e.printStackTrace() ;             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }      @Override     public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {         // 这里检查本地事务是否执行成功         String id = (String) msg.getHeaders().get("BID") ;         System.out.println("执行查询ID为:" + id + " 的数据是否存在") ;         UsersLog usersLog = bs.queryUsersLog(id) ;         if (usersLog == null) {             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }  }
Nach dem Login kopieren

Consumer

@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener<users> {      @Override     public void onMessage(Users users) {         System.out.println("TX接收到消息:" + users) ;     }  }</users>
Nach dem Login kopieren

Service

@Transactional public boolean save(Users users, UsersLog usersLog) {     usersRepository.save(users) ;     usersLogRepository.save(usersLog) ;     if (users.getId() == 1) {         throw new RuntimeException("数据错误") ;     }     return true ; }      public UsersLog queryUsersLog(String bid) {     return usersLogRepository.findByBid(bid) ; }
Nach dem Login kopieren

Controller

@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) {     ps.sendTx("tx-topic", id, "tag10") ;     return "send transaction success" ; }
Nach dem Login kopieren

Test

Nach dem Aufruf der Schnittstelle gibt die Konsole Folgendes aus:

Das geht aus dem Druckprotokoll hervor Der Verbraucher erhält die Nachricht erst, nachdem alles gespeichert wurde.

Wie SpringBoot RocketMQ-Transaktionen, Broadcasts und sequentielle Nachrichten integriertLöschen Sie die Daten und testen Sie dann, ob die ID 1 ist. Es wird ein Fehler gemeldet.

Wie SpringBoot RocketMQ-Transaktionen, Broadcasts und sequentielle Nachrichten integriertEs sind keine Daten in der Datenbank vorhanden. . .

Es ist nicht sehr kompliziert, es wird in 2 Schritten gehandhabt. Wie SpringBoot RocketMQ-Transaktionen, Broadcasts und sequentielle Nachrichten integriert

Das obige ist der detaillierte Inhalt vonWie SpringBoot RocketMQ-Transaktionen, Broadcasts und sequentielle Nachrichten integriert. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:yisu.com
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage