> Java > java지도 시간 > 본문

SpringBoot가 RocketMQ 트랜잭션, 브로드캐스트 및 순차 메시지를 통합하는 방법

WBOY
풀어 주다: 2023-05-18 10:04:05
앞으로
1849명이 탐색했습니다.

환경: springboot2.3.9RELEASE + RocketMQ4.8.0

에 따라 다름

<dependency>   <groupid>org.springframework.boot</groupid>     <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency>     <groupid>org.apache.rocketmq</groupid>     <artifactid>rocketmq-spring-boot-starter</artifactid>     <version>2.2.0</version> </dependency>
로그인 후 복사

구성 파일

server:   port: 8080 --- rocketmq:   nameServer: localhost:9876   producer:     group: demo-mq
로그인 후 복사

일반 메시지

보내기

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String message) {   rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }
로그인 후 복사

수신

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("接收到消息:" + message) ;     }  }</string>
로그인 후 복사

순차 메시지

보내기

@Resource private RocketMQTemplate rocketMQTemplate ;  public void sendOrder(String topic, String message, String tags, int id) {     rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),              "order-" + id, new SendCallback() {                 @Override                 public void onSuccess(SendResult sendResult) {                     System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ;                 }                 @Override                 public void onException(Throwable e) {                     e.printStackTrace() ;                 }             }); }
로그인 후 복사

여기에 해시 키가 전송됩니다. 다른 대기열에서

@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group",      selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ;     }  }</string>
로그인 후 복사

consumeMode = ConsumeMode.ORDERLY는 메시지 모드가 순차 모드, 하나의 대기열 및 하나의 스레드임을 나타냅니다.

Result

SpringBoot가 RocketMQ 트랜잭션, 브로드캐스트 및 순차 메시지를 통합하는 방법

consumerMode = ConsumeMode.CONCURRENTLY일 때 실행 결과는 다음과 같습니다.

SpringBoot가 RocketMQ 트랜잭션, 브로드캐스트 및 순차 메시지를 통합하는 방법

클러스터/브로드캐스트 메시지 모드

Sender

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String topic, String message, String tags) {     rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }
로그인 후 복사

클러스터 메시지 모드

Consumer

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }</string>
로그인 후 복사

메시지 모델 = MessageModel.CLUSTERING

Test

포트가 각각 8080과 8081인 두 서비스를 시작합니다.

8080 service

SpringBoot가 RocketMQ 트랜잭션, 브로드캐스트 및 순차 메시지를 통합하는 방법

8081 service

SpringBoot가 RocketMQ 트랜잭션, 브로드캐스트 및 순차 메시지를 통합하는 방법

클러스터 메시지 모드에서 각 서비스는 메시지의 일부를 별도로 수신합니다. 로드 밸런싱

브로드캐스트 메시지 모드

소비자 측

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) @Component public class ConsumerBroadListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }</string>
로그인 후 복사

messageModel = MessageModel.BROADCASTING

Test

포트 8080 및 8081로 두 서비스 시작

8080 서비스

SpringBoot가 RocketMQ 트랜잭션, 브로드캐스트 및 순차 메시지를 통합하는 방법

8081 서비스

SpringBoot가 RocketMQ 트랜잭션, 브로드캐스트 및 순차 메시지를 통합하는 방법

클러스터 내 메시지 모드에서는 각 서비스가 동일한 메시지를 별도로 수신했습니다.

트랜잭션 메시지

RocketMQ 트랜잭션의 3가지 상태

TransactionStatus.CommitTransaction: 트랜잭션 메시지를 커밋하면 소비자가 이 메시지를 사용할 수 있습니다.

TransactionStatus.RollbackTransaction: 트랜잭션을 롤백합니다. 즉, 메시지가 삭제되고 허용되지 않습니다. 소비됩니다.

TransactionStatus.Unknown: 상태를 확인하기 위해 메시지 대기열을 확인해야 함을 나타내는 중간 상태입니다.

RocketMQ는 일반 트랜잭션 전송 및 제출, 트랜잭션 정보 보상 프로세스의 두 가지 주요 단계로 트랜잭션 메시지를 구현합니다. 전체 프로세스는 다음과 같습니다.

일반 트랜잭션 전송 및 제출 단계

1 생산자는 MQServer에 메시지 1개 반을 보냅니다. (하프 메시지는 소비자가 일시적으로 소비할 수 없는 메시지를 의미합니다.)

2. 서버가 메시지 작성 결과에 응답하고, 하프 메시지가 성공적으로 전송됩니다

3. 로컬 트랜잭션 실행을 시작합니다

4. 또는 로컬 트랜잭션의 실행 상태에 따른 롤백 작업

트랜잭션 정보 보상 프로세스

1. MQServer는 오랫동안 로컬 트랜잭션의 실행 상태를 수신하지 못한 경우 확인 검토 작업 요청을 시작합니다. producer

2. Producer가 확인 검토 요청을 받은 후 로컬 트랜잭션의 실행 상태를 확인합니다

3. 확인 결과에 따라 Commit 또는 Rollback 작업을 실행합니다.

보상 단계는 주로 타임아웃 또는 실패 문제를 해결하는 데 사용됩니다. 생산자가 커밋 또는 롤백 작업을 보낼 때.

Sender

@Resource private RocketMQTemplate rocketMQTemplate ;      public void sendTx(String topic, Long id, String tags) {     rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(             new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))).             setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(),              UUID.randomUUID().toString().replaceAll("-", "")) ; }
로그인 후 복사

Producer 해당 리스너

@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener {          @Resource     private BusinessService bs ;      @Override     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {         // 这里执行本地的事务操作,比如保存数据。         try {             // 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据             String id = (String) msg.getHeaders().get("BID") ;             Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ;             System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id) ;             bs.save(users, new UsersLog(users.getId(), id)) ;         } catch (Exception e) {             e.printStackTrace() ;             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }      @Override     public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {         // 这里检查本地事务是否执行成功         String id = (String) msg.getHeaders().get("BID") ;         System.out.println("执行查询ID为:" + id + " 的数据是否存在") ;         UsersLog usersLog = bs.queryUsersLog(id) ;         if (usersLog == null) {             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }  }
로그인 후 복사

Consumer

@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener<users> {      @Override     public void onMessage(Users users) {         System.out.println("TX接收到消息:" + users) ;     }  }</users>
로그인 후 복사

Service

@Transactional public boolean save(Users users, UsersLog usersLog) {     usersRepository.save(users) ;     usersLogRepository.save(usersLog) ;     if (users.getId() == 1) {         throw new RuntimeException("数据错误") ;     }     return true ; }      public UsersLog queryUsersLog(String bid) {     return usersLogRepository.findByBid(bid) ; }
로그인 후 복사

Controller

@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) {     ps.sendTx("tx-topic", id, "tag10") ;     return "send transaction success" ; }
로그인 후 복사

Test

인터페이스를 호출한 후 콘솔 출력은 다음과 같습니다.

SpringBoot가 RocketMQ 트랜잭션, 브로드캐스트 및 순차 메시지를 통합하는 방법

인쇄 로그 소비자는 모든 것이 저장된 후에만 메시지를 받습니다.

SpringBoot가 RocketMQ 트랜잭션, 브로드캐스트 및 순차 메시지를 통합하는 방법

SpringBoot가 RocketMQ 트랜잭션, 브로드캐스트 및 순차 메시지를 통합하는 방법

데이터를 삭제한 후 ID가 1인지 테스트하면 오류가 보고됩니다.

SpringBoot가 RocketMQ 트랜잭션, 브로드캐스트 및 순차 메시지를 통합하는 방법

데이터베이스에 데이터가 없습니다. . .

별로 복잡하지 않고 2단계로 처리됩니다.

위 내용은 SpringBoot가 RocketMQ 트랜잭션, 브로드캐스트 및 순차 메시지를 통합하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:yisu.com
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿