RocketMQ 메시지 모드에는 브로드캐스트 모드와 클러스터 모드(로드 밸런싱 모드)의 두 가지가 있습니다.
브로드캐스트 모드는 모든 소비자가 메시지를 소비한다는 것을 의미합니다.
로드 밸런싱 모드는 각 소비가 특정 소비자에 의해 한 번만 소비된다는 것을 의미합니다.
우리는 일반적으로 비즈니스에서 로드 밸런싱 모드를 사용합니다. 물론 이메일, 휴대폰 또는 현장 프롬프트로 메시지를 보내는 등 일부 특수한 시나리오에서는 브로드캐스트 모드를 사용해야 합니다. 은 기본 클러스터 로드 밸런싱 모드입니다
브로드캐스트 메시지를 구현하기 위해 springboot+rockermq 통합을 도입하겠습니다@RocketMQMessageListener
的messageModel
属性值来设置,MessageModel.BROADCASTING
是广播模式,MessageModel.CLUSTERING
<!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency>
포트: 8083name-server: 127.0.0.1:9876
# 구성 Rocketmq
rocketmq:#Producer
프로덕션 측: 메시지를 보내기 위한 새 컨트롤러 생성
producer:
# Producer 그룹 이름은 애플리케이션에서 고유해야 합니다
group: group1
# 기본 시간 제한 메시지 전송 시간은 3000ms
send -message-timeout: 3000
# 메시지가 4096바이트에 도달하면 메시지가 압축됩니다. 기본값은 4096
압축 메시지 본문 임계값: 4096
#최대 메시지 제한, 기본값은 128K
최대 메시지 크기: 4194304
# 실패한 동기화 메시지 전송에 대한 재시도 횟수
retry-times-when-send-failed : 3
#내부 전송 실패 시 다른 에이전트를 재시도할지 여부, 이 매개변수는 브로커가 여러 개 있는 경우에만 적용됩니다.
retry-next-server: true
#비동기 메시지 전송 실패 시 재시도 횟수 retry-times-when-send -async-failed: 3
프로덕션 측에서는 일반적인 전송 로직에 따라 메시지를 보낼 수 있습니다
package com.example.springbootrocketdemo.controller; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 广播消息 * @author qzz */ @RestController public class RocketMQBroadCOntroller { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送广播消息 */ @RequestMapping("/testBroadSend") public void testSyncSend(){ //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:消息内容 for(int i=0;i<10;i++){ rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i); } } }
두 개의 소비자를 생성하여 메시지 소비
먼저 클러스터 로드 밸런싱 테스트를 수행하고 messageModel=MessageModel.CLUSTERING
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING) public class RocketMQBroadConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("集群模式 消费者1,消费消息:"+s); } }
Consumer 2: Consumer 1
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING) public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("集群模式 消费者2,消费消息:"+s); } }
과 동일한 ConsumerGroup 및 주제에서 서비스를 시작하고 클러스터 모드를 테스트합니다. Consumption
클러스터 모드 테스트: 두 Consumer가 메시지를 동등하게 공유
위 두 Consumer의 messageModel 속성 값을 브로드캐스트 모드로 변경
Consumer 1:
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING) public class RocketMQBroadConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("广播消息1 广播模式,消费消息:"+s); } }
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING) public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("广播消息2 广播模式,消费消息:"+s); } }
서비스를 다시 시작하고 방송 모드 소비를 테스트
위 내용은 Springboot의 RocketMQ에서 브로드캐스트 메시지를 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!