springboot rockermq 實作簡單的訊息發送與接收
普通訊息的發送方式有3種:單向發送、同步發送和非同步發送。
下面來介紹下springboot rockermq 整合實作普通訊息的發送與接收
建立Springboot項目,加入rockermq 依賴
<!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency>
設定rocketmq
# 埠
server:
port: 8083設定rocketmq
。 #訊息發送的逾時時間預設3000ms
send-message-timeout: 3000
#訊息達到4096位元組的時候,訊息就會被壓縮。預設4096
compress-message-body-threshold: 4096
#最大的訊息限制,預設為128K
# max-message-size: 4194304
max-message-size: 4194304
重複發送訊息次數#同步傳送訊息# retry-times-when-send-failed: 3
#在內部傳送失敗時是否重試其他代理,這個參數在有多個broker時才生效
retry-next-server: true
#非同步訊息發送失敗重試的次數
retry-times-when-send-async-failed: 3
新建一個controller 來做訊息發送:
package com.example.springbootrocketdemo.controller; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 普通信息的三种方式:同步、异步、单向 * @author qzz */ @RestController public class RocketMQCOntroller { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送普通消息 * convertAndSend(String destination, Object payload) 发送字符串比较方便 */ @RequestMapping("/send") public void send(){ rocketMQTemplate.convertAndSend("test-topic","test-message"); } /** * 发送同步消息 */ @RequestMapping("/testSyncSend") public void testSyncSend(){ //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:消息内容 SendResult sendResult = rocketMQTemplate.syncSend("test-topic","同步消息测试"); System.out.println(sendResult); } /** * 发送异步消息 */ @RequestMapping("/testASyncSend") public void testASyncSend(){ //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:消息内容 //参数三:回调 rocketMQTemplate.asyncSend("test-topic", "异步消息测试", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable throwable) { System.out.println("消息发送异常"); throwable.printStackTrace(); } }); } /** * 发送单向消息 */ @RequestMapping("/testOneWay") public void testOneWay(){ //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:消息内容 rocketMQTemplate.sendOneWay("test-topic","单向消息测试"); } }
傳送方法指定Topic主題test-topic。
新訊息消費者監聽RocketMQConsumerListener,監聽訊息,消費訊息
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 消费消息 * 配置RocketMQ监听 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test",topic = "test-topic") public class RocketMQConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("消费消息:"+s); } }
介面,以及動態指定訊息類型String。
,指定topic主題test-topic,以及消費者群組test簡單的訊息發送與接收搭建完畢!
啟動服務,測試訊息消費
測試同步訊息:
測試非同步訊息: 測試單向訊息:
以上是Springboot中RocketMQ怎麼實現訊息發送與接收的詳細內容。更多資訊請關注PHP中文網其他相關文章!