環境:springboot2.3.9RELEASE RocketMQ4.8.0
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.apache.rocketmq</groupid> <artifactid>rocketmq-spring-boot-starter</artifactid> <version>2.2.0</version> </dependency>
server: port: 8080 --- rocketmq: nameServer: localhost:9876 producer: group: demo-mq
發送
@Resource private RocketMQTemplate rocketMQTemplate ; public void send(String message) { rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }
接受
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println("接收到消息:" + message) ; } }</string>
發送
@Resource private RocketMQTemplate rocketMQTemplate ; public void sendOrder(String topic, String message, String tags, int id) { rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(), "order-" + id, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ; } @Override public void onException(Throwable e) { e.printStackTrace() ; } }); }
這裡是根據hashkey將訊息傳送到不同的佇列中
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ; } }</string>
consumeMode = ConsumeMode.ORDERLY,指明了訊息模式為順序模式,一個佇列,一個執行緒。
結果
當consumeMode = ConsumeMode.CONCURRENTLY執行結果如下:
##叢集/廣播訊息模式發送端@Resource private RocketMQTemplate rocketMQTemplate ; public void send(String topic, String message, String tags) { rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println("ConsumerBroadListener1接收到消息:" + message) ; } }</string>
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) @Component public class ConsumerBroadListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println("ConsumerBroadListener1接收到消息:" + message) ; } }</string>
測試
啟動兩個服務分別連接埠是8080,8081
#8080服務
##8081服務叢集訊息模式下,每個服務分別都接受了相同的訊息。
事務訊息
RocketMQ事務的3個狀態
TransactionStatus.CommitTransaction:提交事務訊息,消費者可以消費此訊息
TransactionStatus.RollbackTransaction:回滾事務,它代表該訊息將被刪除,不允許被消費。
TransactionStatus.Unknown :中間狀態,它代表需要檢查訊息佇列來決定狀態。
RocketMQ實現事務訊息主要分為兩個階段:正常事務的發送與提交、事務資訊的補償流程整體流程為:
正常事務發送與提交階段
1、生產者發送一個半訊息給MQServer(半訊息是指消費者暫時不能消費的訊息)
#2、服務端回應訊息寫入結果,半訊息發送成功
3.開始執行本地事務
4、根據本地事務的執行狀態執行Commit或Rollback操作
事務資訊的補償流程
1、如果MQServer長時間沒收到本地事務的執行狀態會向生產者發起一個確認回查的操作請求
2、生產者收到確認回查請求後,檢查本地事務的執行狀態
3、根據檢查後的結果執行Commit或Rollback操作
補償階段主要是用來解決生產者在發送Commit或Rollback操作時發生逾時或失敗的情況。
發送端
@Resource private RocketMQTemplate rocketMQTemplate ; public void sendTx(String topic, Long id, String tags) { rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload( new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))). setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(), UUID.randomUUID().toString().replaceAll("-", "")) ; }
生產者對應的監聽器
@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener { @Resource private BusinessService bs ; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 这里执行本地的事务操作,比如保存数据。 try { // 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据 String id = (String) msg.getHeaders().get("BID") ; Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ; System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id) ; bs.save(users, new UsersLog(users.getId(), id)) ; } catch (Exception e) { e.printStackTrace() ; return RocketMQLocalTransactionState.ROLLBACK ; } return RocketMQLocalTransactionState.COMMIT ; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 这里检查本地事务是否执行成功 String id = (String) msg.getHeaders().get("BID") ; System.out.println("执行查询ID为:" + id + " 的数据是否存在") ; UsersLog usersLog = bs.queryUsersLog(id) ; if (usersLog == null) { return RocketMQLocalTransactionState.ROLLBACK ; } return RocketMQLocalTransactionState.COMMIT ; } }
消費端
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener<users> { @Override public void onMessage(Users users) { System.out.println("TX接收到消息:" + users) ; } }</users>
Service
@Transactional public boolean save(Users users, UsersLog usersLog) { usersRepository.save(users) ; usersLogRepository.save(usersLog) ; if (users.getId() == 1) { throw new RuntimeException("数据错误") ; } return true ; } public UsersLog queryUsersLog(String bid) { return usersLogRepository.findByBid(bid) ; }
Controller
@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) { ps.sendTx("tx-topic", id, "tag10") ; return "send transaction success" ; }
從列印日誌看出來都保存完畢了後消費端才接受到訊息。
#刪除數據,再測試ID為1會報錯的。 #########資料庫中沒有資料。 。 。 ######是不是也不是很複雜,2個階段來處理。 ###以上是SpringBoot如何整合RocketMQ事務、廣播以及順序訊息的詳細內容。更多資訊請關注PHP中文網其他相關文章!