Umgebung: springboot2.3.9RELEASE + RocketMQ4.8.0
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.apache.rocketmq</groupid> <artifactid>rocketmq-spring-boot-starter</artifactid> <version>2.2.0</version> </dependency>
server: port: 8080 --- rocketmq: nameServer: localhost:9876 producer: group: demo-mq
Senden
@Resource private RocketMQTemplate rocketMQTemplate ; public void send(String message) { rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }
Empfangen
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println("接收到消息:" + message) ; } }</string>
Senden
@Resource private RocketMQTemplate rocketMQTemplate ; public void sendOrder(String topic, String message, String tags, int id) { rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(), "order-" + id, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ; } @Override public void onException(Throwable e) { e.printStackTrace() ; } }); }
Hier sind die Hashes, die der Schlüssel sendet Nachricht an In verschiedenen Warteschlangen gibt
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ; } }</string>
consumeMode = ConsumeMode.ORDERLY an, dass der Nachrichtenmodus der sequentielle Modus ist, eine Warteschlange und ein Thread.
Ergebnis
Wenn „consumeMode“ = ConsumeMode.CONCURRENTLY, lautet das Ausführungsergebnis wie folgt:
Sender
@Resource private RocketMQTemplate rocketMQTemplate ; public void send(String topic, String message, String tags) { rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }
Consum ähm
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println("ConsumerBroadListener1接收到消息:" + message) ; } }</string>
message Model = MessageModel.CLUSTERING
Test
Starten Sie zwei Dienste, deren Ports 8080 und 8081 sind.
8080-Dienst Balancing
Broadcast-Nachrichtenmodus
Verbraucherseite@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) @Component public class ConsumerBroadListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println("ConsumerBroadListener1接收到消息:" + message) ; } }</string>
Test
Starten Sie zwei Dienste mit den Ports 8080 und 8081
In-Cluster-Nachricht Im Modus empfing jeder Dienst separat dieselbe Nachricht.
Transaktionsnachricht
3 Status von RocketMQ-TransaktionenTransactionStatus.CommitTransaction: Transaktionsnachricht festschreiben, der Verbraucher kann diese Nachricht konsumierenTransactionStatus.RollbackTransaction: Rollback der Transaktion, was bedeutet, dass die Nachricht gelöscht und nicht zugelassen wird verzehrt werden.
TransactionStatus.Unknown: Zwischenstatus, der die Notwendigkeit darstellt, die Nachrichtenwarteschlange zu überprüfen, um den Status zu ermitteln.
Normale Transaktionsversand- und -übermittlungsphase
1 Der Produzent sendet eineinhalb Nachrichten an MQServer (Halbe Nachricht bezieht sich auf die Nachricht, die der Verbraucher vorübergehend nicht konsumieren kann)
2. Der Server antwortet auf das Ergebnis des Nachrichtenschreibens und die halbe Nachricht wird erfolgreich gesendet
3 Beginnt mit der Ausführung der lokalen Transaktion
4 oder Rollback entsprechend dem Ausführungsstatus des lokalen Transaktionsvorgangs
Kompensationsprozess der Transaktionsinformationen
1 Wenn MQServer den Ausführungsstatus der lokalen Transaktion längere Zeit nicht erhält, initiiert er eine Bestätigungsüberprüfungsoperationsanforderung an den Produzent
2. Nachdem der Produzent die Bestätigungsüberprüfungsanfrage erhalten hat, überprüfen Sie den Ausführungsstatus lokaler Transaktionen
3. Führen Sie Commit- oder Rollback-Vorgänge basierend auf den Prüfergebnissen aus
Die Kompensationsphase wird hauptsächlich zur Lösung des Problems von Zeitüberschreitungen oder Fehlern verwendet wenn Produzenten Commit- oder Rollback-Vorgänge senden.
Sender
@Resource private RocketMQTemplate rocketMQTemplate ; public void sendTx(String topic, Long id, String tags) { rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload( new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))). setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(), UUID.randomUUID().toString().replaceAll("-", "")) ; }
Producer entsprechender Listener
@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener { @Resource private BusinessService bs ; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 这里执行本地的事务操作,比如保存数据。 try { // 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据 String id = (String) msg.getHeaders().get("BID") ; Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ; System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id) ; bs.save(users, new UsersLog(users.getId(), id)) ; } catch (Exception e) { e.printStackTrace() ; return RocketMQLocalTransactionState.ROLLBACK ; } return RocketMQLocalTransactionState.COMMIT ; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 这里检查本地事务是否执行成功 String id = (String) msg.getHeaders().get("BID") ; System.out.println("执行查询ID为:" + id + " 的数据是否存在") ; UsersLog usersLog = bs.queryUsersLog(id) ; if (usersLog == null) { return RocketMQLocalTransactionState.ROLLBACK ; } return RocketMQLocalTransactionState.COMMIT ; } }
Consumer
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener<users> { @Override public void onMessage(Users users) { System.out.println("TX接收到消息:" + users) ; } }</users>
Service
@Transactional public boolean save(Users users, UsersLog usersLog) { usersRepository.save(users) ; usersLogRepository.save(usersLog) ; if (users.getId() == 1) { throw new RuntimeException("数据错误") ; } return true ; } public UsersLog queryUsersLog(String bid) { return usersLogRepository.findByBid(bid) ; }
Controller
@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) { ps.sendTx("tx-topic", id, "tag10") ; return "send transaction success" ; }
Test
Nach dem Aufruf der Schnittstelle gibt die Konsole Folgendes aus:
Das geht aus dem Druckprotokoll hervor Der Verbraucher erhält die Nachricht erst, nachdem alles gespeichert wurde.Löschen Sie die Daten und testen Sie dann, ob die ID 1 ist. Es wird ein Fehler gemeldet.
Es sind keine Daten in der Datenbank vorhanden. . .
Es ist nicht sehr kompliziert, es wird in 2 Schritten gehandhabt.
Das obige ist der detaillierte Inhalt vonWie SpringBoot RocketMQ-Transaktionen, Broadcasts und sequentielle Nachrichten integriert. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!