怎麼用SpringBoot+RabbitMQ實現訊息可靠傳輸
環境設定
SpringBoot
整合 RabbitMQ
實作訊息的傳送。
1.新增 maven
依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.新增application.yml 設定檔
spring: rabbitmq: host: 192.168.3.19 port: 5672 username: admin password: xxxx
3.設定交換器、佇列以及綁定
@Bean public DirectExchange myExchange() { DirectExchange directExchange = new DirectExchange("myExchange"); return directExchange; } @Bean public Queue myQueue() { Queue queue = new Queue("myQueue"); return queue; } @Bean public Binding binding() { return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey"); }
4.生產發送訊息
@Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public String send(String message) { rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message); System.out.println("【发送消息】" + message) return "【send message】" + message; }
5.消費者接收訊息
@RabbitListener(queuesToDeclare = @Queue("myQueue")) public void process(String msg, Channel channel, Message message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String time = sdf.format(date); System.out.println("【接收信息】" + msg + " 当前时间" + time);
6.呼叫生產端發送訊息 hello
,控制台輸出:
【傳送訊息】hello
【接收訊息】hello 目前時間2022-05-12 10:21:14
說明訊息已經成功接收。
訊息遺失分析
一則訊息的從生產到消費,訊息遺失可能發生在以下階段:
-
#生產端遺失: 生產者無法傳送到
RabbitMQ
儲存端遺失:
RabbitMQ
儲存自身掛了消費端遺失:儲存因網路問題,無法傳送到消費端,或是消費掛了,無法傳送正常消費
RabbitMQ
從生產端、儲存端、消費端都對可靠度傳輸做很好的支援。
生產階段
生產階段透過請求確認機制,來確保訊息的可靠傳輸。當發送訊息到 RabbitMQ 伺服器 之後,RabbitMQ 收到訊息之後,給發送回傳一個請求確認,表示RabbitMQ 伺服器已成功的接收到了訊息。
設定application.yml
spring: rabbitmq: # 消息确认机制 生产者 -> 交换机 publisher-confirms: true # 消息返回机制 交换机 -> 队列 publisher-returns: true
設定
@Configuration @Slf4j public class RabbitConfig { @Autowired private ConnectionFactory connectionFactory; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("【correlationData】:" + correlationData); log.info("【ack】" + ack); log.info("【cause】" + cause); if (ack) { log.info("【发送成功】"); } else { log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause); } } }); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.warn("【消息发送失败】"); log.info("【message】" + message); log.info("【replyCode】" + replyCode); } }); return rabbitTemplate; } }
#訊息從 生產者 到 交換器, 有confirmCallback
確認模式。發送訊息成功後訊息會呼叫方法confirm(CorrelationData correlationData, boolean ack, String cause)
,根據 ack
判斷訊息是否成功傳送。
訊息從 交換器 到 佇列,有returnCallback
退回模式。
傳送訊息 product message
控制台輸出如下:
##生產端類比訊息遺失這裡有兩個方案:【傳送訊息】product message
【接收訊息】product message 目前時間2022-05 -12 11:27:56
【correlationData】:null
【ack】true
【cause】null
【傳送成功】
- 發送訊息後立刻關閉broke,後者把網路關閉,但是broker關閉之後控制台一直就會報錯,發送訊息也報500錯誤。
- 發送不存在的交換器:
// myExchange 修改成 myExchangexxxxx rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);
【correlationData】:null當傳送失敗可以對訊息進行重試交換器正確,傳送不存在的佇列:交換器接收到訊息,回傳成功通知,控制台輸出:# 【ack】false
【cause】channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/xxx', class-id =60, method-id=40)
【傳送失敗】
【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]交換器沒有找到佇列,回傳失敗訊息:##【訊息傳送失敗】【ack 】true
【cause】null
【傳送成功】
【message】product message【replyCode】312
#RabbitMQ
開啟佇列持久化,建立的佇列和交換器
預設配置是持久化的。先把佇列和交換器設定正確,修改消費監聽的佇列,使得訊息存放在佇列裡。 修改佇列的持久化,修改成非持久化:
@Bean public Queue myQueue() { Queue queue = new Queue("myQueue",false); return queue; }
傳送訊息之後,訊息存放在佇列中,然後重啟
RabbitMQ,訊息不存在了。 設定佇列持久化:
@Bean public Queue myQueue() { Queue queue = new Queue("myQueue",true); return queue; }
重啟之後,佇列的訊息還存在。
消費端
消費端預設開始
ack 自動確認模式,當佇列訊息被消費者接收,不管有沒有被消費端訊息,都會自動刪除佇列中的消息。所以為了確保消費端能成功消費訊息,將自動模式改成手動確認模式:修改application.yml 檔案
spring: rabbitmq: # 手动消息确认 listener: simple: acknowledge-mode: manual
消費接收訊息之後需要手動確認:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
@RabbitListener(queuesToDeclare = @Queue("myQueue")) public void process(String msg, Channel channel, Message message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String time = sdf.format(date); System.out.println("【接收信息】" + msg + " 当前时间" + time); System.out.println(message.getMessageProperties().getDeliveryTag()); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } }
如果不新增:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
傳送兩個訊息
訊息被接收後,沒有確認,重新放到佇列:
重啟項目,之後,佇列的訊息會傳送到消費者,但是沒有ack 確認,還是繼續放回佇列。
加上 channel.basicAck
之後,再重啟項目
佇列訊息就被刪除了
basicAck
方法最後一個參數 multiple
表示是刪除先前的佇列。
multiple
設定為 true
,把後面的佇列都清理掉了
以上是怎麼用SpringBoot+RabbitMQ實現訊息可靠傳輸的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

如何利用React和RabbitMQ建立可靠的訊息傳遞應用程式引言:現代化的應用程式需要支援可靠的訊息傳遞,以實現即時更新和資料同步等功能。 React是一種流行的JavaScript庫,用於建立使用者介面,而RabbitMQ是一種可靠的訊息傳遞中間件。本文將介紹如何結合React和RabbitMQ建立可靠的訊息傳遞應用,並提供具體的程式碼範例。 RabbitMQ概述:

如何在PHP中使用RabbitMQ實現分散式訊息處理引言:在大規模應用程式開發中,分散式系統已成為一個常見的需求。分散式訊息處理是這樣的一種模式,透過將任務分發到多個處理節點,可以提高系統的效率和可靠性。 RabbitMQ是一個開源的,可靠的訊息佇列系統,它採用AMQP協定來實現訊息的傳遞和處理。在本文中,我們將介紹如何在PHP中使用RabbitMQ來實現分佈

SpringBoot和SpringMVC都是Java開發中常用的框架,但它們之間有一些明顯的差異。本文將探究這兩個框架的特點和用途,並對它們的差異進行比較。首先,我們來了解一下SpringBoot。 SpringBoot是由Pivotal團隊開發的,它旨在簡化基於Spring框架的應用程式的建立和部署。它提供了一種快速、輕量級的方式來建立獨立的、可執行

隨著現代應用程式的複雜性增加,訊息傳遞已成為一種強大的工具。在這個領域,RabbitMQ已成為一個非常受歡迎的訊息代理,可以用於在不同的應用程式之間傳遞訊息。在這篇文章中,我們將探討如何在Go語言中使用RabbitMQ。本指南將涵蓋以下內容:RabbitMQ簡介RabbitMQ安裝RabbitMQ基礎概念Go語言中的RabbitMQ入門RabbitMQ和Go

本文來寫個詳細的例子來說下dubbo+nacos+Spring Boot開發實戰。本文不會講述太多的理論的知識,會寫一個最簡單的例子來說明dubbo如何與nacos整合,快速建構開發環境。

現在越來越多的企業開始採用微服務架構模式,而在這個架構中,訊息佇列成為一種重要的通訊方式,其中RabbitMQ被廣泛應用。而在go語言中,go-zero是近年來崛起的一種框架,它提供了許多實用的工具和方法,讓開發者更輕鬆地使用訊息佇列,下面我們將結合實際應用,來介紹go-zero和RabbitMQ的使用方法和應用實務。 1.RabbitMQ概述Rabbit

隨著網路時代的到來,訊息佇列系統變得越來越重要。它可以使不同的應用之間實現非同步操作、降低耦合度、提高可擴展性,進而提升整個系統的效能和使用者體驗。在訊息佇列系統中,RabbitMQ是一個強大的開源訊息佇列軟體,它支援多種訊息協定、被廣泛應用於金融交易、電子商務、線上遊戲等領域。在實際應用中,往往需要將RabbitMQ和其他系統整合。本文將介紹如何使用sw

Golang與RabbitMQ實現即時數據同步的解決方案引言:當今時代,隨著互聯網的普及和數據量的爆發式增長,即時數據的同步變得越來越重要。為了解決資料非同步傳輸和資料同步的問題,許多公司開始採用訊息佇列的方式來實現資料的即時同步。本文將介紹基於Golang和RabbitMQ的即時資料同步的解決方案,並提供具體的程式碼範例。一、什麼是RabbitMQ? Rabbi
