SpringBoot
整合 RabbitMQ
實作訊息的傳送。
1.新增 maven
依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.新增application.yml 設定檔
spring: rabbitmq: host: 192.168.3.19 port: 5672 username: admin password: xxxx
3.設定交換器、佇列以及綁定
@Bean public DirectExchange myExchange() { DirectExchange directExchange = new DirectExchange("myExchange"); return directExchange; } @Bean public Queue myQueue() { Queue queue = new Queue("myQueue"); return queue; } @Bean public Binding binding() { return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey"); }
4.生產發送訊息
@Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public String send(String message) { rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message); System.out.println("【发送消息】" + message) return "【send message】" + message; }
5.消費者接收訊息
@RabbitListener(queuesToDeclare = @Queue("myQueue")) public void process(String msg, Channel channel, Message message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String time = sdf.format(date); System.out.println("【接收信息】" + msg + " 当前时间" + time);
6.呼叫生產端發送訊息 hello
,控制台輸出:
【傳送訊息】hello
【接收訊息】hello 目前時間2022-05-12 10:21:14
說明訊息已經成功接收。
一則訊息的從生產到消費,訊息遺失可能發生在以下階段:
#生產端遺失: 生產者無法傳送到 RabbitMQ
儲存端遺失: RabbitMQ
儲存自身掛了
消費端遺失:儲存因網路問題,無法傳送到消費端,或是消費掛了,無法傳送正常消費
RabbitMQ
從生產端、儲存端、消費端都對可靠度傳輸做很好的支援。
生產階段透過請求確認機制,來確保訊息的可靠傳輸。當發送訊息到 RabbitMQ 伺服器 之後,RabbitMQ 收到訊息之後,給發送回傳一個請求確認,表示RabbitMQ 伺服器已成功的接收到了訊息。
設定application.yml
spring: rabbitmq: # 消息确认机制 生产者 -> 交换机 publisher-confirms: true # 消息返回机制 交换机 -> 队列 publisher-returns: true
設定
@Configuration @Slf4j public class RabbitConfig { @Autowired private ConnectionFactory connectionFactory; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("【correlationData】:" + correlationData); log.info("【ack】" + ack); log.info("【cause】" + cause); if (ack) { log.info("【发送成功】"); } else { log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause); } } }); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.warn("【消息发送失败】"); log.info("【message】" + message); log.info("【replyCode】" + replyCode); } }); return rabbitTemplate; } }
#訊息從 生產者 到 交換器, 有confirmCallback
確認模式。發送訊息成功後訊息會呼叫方法confirm(CorrelationData correlationData, boolean ack, String cause)
,根據 ack
判斷訊息是否成功傳送。
訊息從 交換器 到 佇列,有returnCallback
退回模式。
傳送訊息 product message
控制台輸出如下:
##生產端類比訊息遺失這裡有兩個方案:【傳送訊息】product message
【接收訊息】product message 目前時間2022-05 -12 11:27:56
【correlationData】:null
【ack】true
【cause】null
【傳送成功】
// myExchange 修改成 myExchangexxxxx rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);
【correlationData】:null當傳送失敗可以對訊息進行重試交換器正確,傳送不存在的佇列:交換器接收到訊息,回傳成功通知,控制台輸出:# 【ack】false
【cause】channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/xxx', class-id =60, method-id=40)
【傳送失敗】
【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]交換器沒有找到佇列,回傳失敗訊息:##【訊息傳送失敗】【ack 】true
【cause】null
【傳送成功】
【message】product message【replyCode】312
#RabbitMQ
的。先把佇列和交換器設定正確,修改消費監聽的佇列,使得訊息存放在佇列裡。 修改佇列的持久化,修改成非持久化:
@Bean public Queue myQueue() { Queue queue = new Queue("myQueue",false); return queue; }
傳送訊息之後,訊息存放在佇列中,然後重啟
RabbitMQ,訊息不存在了。 設定佇列持久化:
@Bean public Queue myQueue() { Queue queue = new Queue("myQueue",true); return queue; }
消費端
自動確認模式,當佇列訊息被消費者接收,不管有沒有被消費端訊息,都會自動刪除佇列中的消息。所以為了確保消費端能成功消費訊息,將自動模式改成手動確認模式:修改application.yml 檔案
spring: rabbitmq: # 手动消息确认 listener: simple: acknowledge-mode: manual
消費接收訊息之後需要手動確認:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
@RabbitListener(queuesToDeclare = @Queue("myQueue")) public void process(String msg, Channel channel, Message message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String time = sdf.format(date); System.out.println("【接收信息】" + msg + " 当前时间" + time); System.out.println(message.getMessageProperties().getDeliveryTag()); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } }
如果不新增:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
傳送兩個訊息
訊息被接收後,沒有確認,重新放到佇列:
重啟項目,之後,佇列的訊息會傳送到消費者,但是沒有ack 確認,還是繼續放回佇列。
加上 channel.basicAck
之後,再重啟項目
佇列訊息就被刪除了
basicAck
方法最後一個參數 multiple
表示是刪除先前的佇列。
multiple
設定為 true
,把後面的佇列都清理掉了
以上是怎麼用SpringBoot+RabbitMQ實現訊息可靠傳輸的詳細內容。更多資訊請關注PHP中文網其他相關文章!