RocketMQ訊息模式主要有兩種:廣播模式、叢集模式(負載平衡模式)
廣播模式是每個消費者,都會消費訊息;
負載平衡模式是每一個消費只會被某一個消費者消費一次;
我們業務上一般用的是負載平衡模式,當然一些特殊場景需要用到廣播模式,比如發送一個訊息到郵箱,手機,站內提示;
我們可以透過@RocketMQMessageListener
的messageModel
屬性值來設置,MessageModel.BROADCASTING
是廣播模式,MessageModel.CLUSTERINGING
是預設叢集負載平衡模式
下面來介紹下springboot rockermq 整合實作廣播訊息
建立Springboot項目,加入rockermq 依賴
<!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency>
設定rocketmq
retry-times-when-send-async-failed: 3## 埠
server:
port: 8083#設定rocketmq
#非同步訊息發送失敗重試的次數
rocketmq:
name-server: 127.0.0.1:9876
#生產者
producer:
#生產者群組名,規定在一個應用程式裡面必須唯一
#生產者群組名稱,規定在一個應用程式裡面必須唯一
group : group1
#訊息傳送的逾時時間預設3000ms
send-message-timeout: 3000
#訊息達到4096位元組的時候,訊息就會被壓縮。預設4096
compress-message-body-threshold: 4096
#最大的訊息限制,預設為128K
# max-message-size: 4194304
max-message-size: 4194304
重複發送訊息次數#同步傳送訊息# retry-times-when-send-failed: 3
#在內部傳送失敗時是否重試其他代理,這個參數在有多個broker時才生效
retry-next-server: true
package com.example.springbootrocketdemo.controller; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 广播消息 * @author qzz */ @RestController public class RocketMQBroadCOntroller { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送广播消息 */ @RequestMapping("/testBroadSend") public void testSyncSend(){ //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:消息内容 for(int i=0;i<10;i++){ rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i); } } }
#我們先叢集負載平衡測試,加上messageModel=MessageModel.CLUSTERING
消費者1:
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING) public class RocketMQBroadConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("集群模式 消费者1,消费消息:"+s); } }
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING) public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("集群模式 消费者2,消费消息:"+s); } }
#叢集模式測試: 兩個消費者平攤訊息
消費者1:
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING) public class RocketMQBroadConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("广播消息1 广播模式,消费消息:"+s); } }
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 广播消息 * 配置RocketMQ监听 * MessageModel.CLUSTERING:集群模式 * MessageModel.BROADCASTING:广播模式 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING) public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("广播消息2 广播模式,消费消息:"+s); } }
以上是SpringbootRocketMQ怎麼實現廣播訊息的詳細內容。更多資訊請關注PHP中文網其他相關文章!