首頁 > Java > java教程 > 主體

SpringBoot如何整合RocketMQ事務、廣播以及順序訊息

WBOY
發布: 2023-05-18 10:04:05
轉載
1861 人瀏覽過

環境:springboot2.3.9RELEASE RocketMQ4.8.0

#依賴

<dependency>   <groupid>org.springframework.boot</groupid>     <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency>     <groupid>org.apache.rocketmq</groupid>     <artifactid>rocketmq-spring-boot-starter</artifactid>     <version>2.2.0</version> </dependency>
登入後複製

設定檔

server:   port: 8080 --- rocketmq:   nameServer: localhost:9876   producer:     group: demo-mq
登入後複製

普通訊息

發送

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String message) {   rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }
登入後複製

接受

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("接收到消息:" + message) ;     }  }</string>
登入後複製

順序訊息

發送

@Resource private RocketMQTemplate rocketMQTemplate ;  public void sendOrder(String topic, String message, String tags, int id) {     rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),              "order-" + id, new SendCallback() {                 @Override                 public void onSuccess(SendResult sendResult) {                     System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ;                 }                 @Override                 public void onException(Throwable e) {                     e.printStackTrace() ;                 }             }); }
登入後複製

這裡是根據hashkey將訊息傳送到不同的佇列中

@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group",      selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ;     }  }</string>
登入後複製

consumeMode = ConsumeMode.ORDERLY,指明了訊息模式為順序模式,一個佇列,一個執行緒。

結果

SpringBoot如何整合RocketMQ事務、廣播以及順序訊息

當consumeMode = ConsumeMode.CONCURRENTLY執行結果如下:

SpringBoot如何整合RocketMQ事務、廣播以及順序訊息

##叢集/廣播訊息模式

發送端

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String topic, String message, String tags) {     rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }
登入後複製
叢集訊息模式

消費端

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }</string>
登入後複製
messageModel = MessageModel.CLUSTERING

測試

#啟動兩個服務分別連接埠是8080,8081

#8080服務

SpringBoot如何整合RocketMQ事務、廣播以及順序訊息

8081服務

SpringBoot如何整合RocketMQ事務、廣播以及順序訊息

##叢集訊息模式下,每個服務分別接收一部分訊息,實現了負載平衡

廣播訊息模式

消費端

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) @Component public class ConsumerBroadListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }</string>
登入後複製
messageModel = MessageModel.BROADCASTING

SpringBoot如何整合RocketMQ事務、廣播以及順序訊息測試

啟動兩個服務分別連接埠是8080,8081

#8080服務SpringBoot如何整合RocketMQ事務、廣播以及順序訊息

##8081服務

叢集訊息模式下,每個服務分別都接受了相同的訊息。

事務訊息

RocketMQ事務的3個狀態

TransactionStatus.CommitTransaction:提交事務訊息,消費者可以消費此訊息

TransactionStatus.RollbackTransaction:回滾事務,它代表該訊息將被刪除,不允許被消費。

TransactionStatus.Unknown :中間狀態,它代表需要檢查訊息佇列來決定狀態。

RocketMQ實現事務訊息主要分為兩個階段:正常事務的發送與提交、事務資訊的補償流程整體流程為:

正常事務發送與提交階段

1、生產者發送一個半訊息給MQServer(半訊息是指消費者暫時不能消費的訊息)

#2、服務端回應訊息寫入結果,半訊息發送成功

3.開始執行本地事務

4、根據本地事務的執行狀態執行Commit或Rollback操作

事務資訊的補償流程

1、如果MQServer長時間沒收到本地事務的執行狀態會向生產者發起一個確認回查的操作請求

2、生產者收到確認回查請求後,檢查本地事務的執行狀態

3、根據檢查後的結果執行Commit或Rollback操作

補償階段主要是用來解決生產者在發送Commit或Rollback操作時發生逾時或失敗的情況。

發送端

@Resource private RocketMQTemplate rocketMQTemplate ;      public void sendTx(String topic, Long id, String tags) {     rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(             new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))).             setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(),              UUID.randomUUID().toString().replaceAll("-", "")) ; }
登入後複製

生產者對應的監聽器

@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener {          @Resource     private BusinessService bs ;      @Override     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {         // 这里执行本地的事务操作,比如保存数据。         try {             // 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据             String id = (String) msg.getHeaders().get("BID") ;             Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ;             System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id) ;             bs.save(users, new UsersLog(users.getId(), id)) ;         } catch (Exception e) {             e.printStackTrace() ;             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }      @Override     public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {         // 这里检查本地事务是否执行成功         String id = (String) msg.getHeaders().get("BID") ;         System.out.println("执行查询ID为:" + id + " 的数据是否存在") ;         UsersLog usersLog = bs.queryUsersLog(id) ;         if (usersLog == null) {             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }  }
登入後複製

消費端

@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener<users> {      @Override     public void onMessage(Users users) {         System.out.println("TX接收到消息:" + users) ;     }  }</users>
登入後複製

Service

@Transactional public boolean save(Users users, UsersLog usersLog) {     usersRepository.save(users) ;     usersLogRepository.save(usersLog) ;     if (users.getId() == 1) {         throw new RuntimeException("数据错误") ;     }     return true ; }      public UsersLog queryUsersLog(String bid) {     return usersLogRepository.findByBid(bid) ; }
登入後複製

ControllerSpringBoot如何整合RocketMQ事務、廣播以及順序訊息

@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) {     ps.sendTx("tx-topic", id, "tag10") ;     return "send transaction success" ; }
登入後複製
測試

呼叫介面後,控制台輸出:

SpringBoot如何整合RocketMQ事務、廣播以及順序訊息

SpringBoot如何整合RocketMQ事務、廣播以及順序訊息從列印日誌看出來都保存完畢了後消費端才接受到訊息。

SpringBoot如何整合RocketMQ事務、廣播以及順序訊息

#刪除數據,再測試ID為1會報錯的。

#########資料庫中沒有資料。 。 。 ######是不是也不是很複雜,2個階段來處理。 ###

以上是SpringBoot如何整合RocketMQ事務、廣播以及順序訊息的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:yisu.com
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板