SpringBoot如何實現MQTT訊息發送與接收
Spring integration互動邏輯
對於發佈者:
1.訊息透過訊息閘道傳送出去,由 MessageChannel
的實例 DirectChannel
處理傳送的細節。
2.DirectChannel
收到訊息後,內部透過 MessageHandler
的實例 MqttPahoMessageHandler
傳送到指定的 Topic。
對於訂閱者:
1.透過注入 MessageProducerSupport
的實例 MqttPahoMessageDrivenChannelAdapter
,實現訂閱Topic 和綁定訊息消費的 MessageChannel
。
2.同樣由 MessageChannel
的實例 DirectChannel
處理消費細節。
Channel 訊息後會傳送給我們自訂的 MqttInboundMessageHandler
實例進行消費。
可以看到整個處理的流程和前面將的基本一致。 Spring Integration 就是抽象化了這麼一套訊息通訊的機制,具體的通訊細節由它整合的中間件來決定。
1、maven依賴
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> <version>2.5.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-stream --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> <version>5.5.5</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.5.5</version> </dependency>
2、yaml設定檔
#mqtt配置 mqtt: username: 123 password: 123 #MQTT-服务器连接地址,如果有多个,用逗号隔开 url: tcp://127.0.0.1:1883 #MQTT-连接服务器默认客户端ID client: id: ${random.value} default: #MQTT-默认的消息推送主题,实际可在调用接口时指定 topic: topic,mqtt/test/# #连接超时 completionTimeout: 3000
3、mqtt生產者消費者設定類別
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import java.util.Arrays; import java.util.List; /** * mqtt 推送and接收 消息类 **/ @Configuration @IntegrationComponentScan @Slf4j public class MqttSenderAndReceiveConfig { private static final byte[] WILL_DATA; static { WILL_DATA = "offline".getBytes(); } @Autowired private MqttReceiveHandle mqttReceiveHandle; @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.url}") private String hostUrl; @Value("${mqtt.client.id}") private String clientId; @Value("${mqtt.default.topic}") private String defaultTopic; @Value("${mqtt.completionTimeout}") private int completionTimeout; //连接超时 /** * MQTT连接器选项 **/ @Bean(value = "getMqttConnectOptions") public MqttConnectOptions getMqttConnectOptions1() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 mqttConnectOptions.setCleanSession(true); // 设置超时时间 单位为秒 mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制 mqttConnectOptions.setKeepAliveInterval(10); // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false); return mqttConnectOptions; } /** * MQTT工厂 **/ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions1()); return factory; } /** * MQTT信息通道(生产者) **/ @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /** * MQTT消息处理器(生产者) **/ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); messageHandler.setAsyncEvents(true); // 消息发送和传输完成会有异步的通知回调 //设置转换器 发送bytes数据 DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes(true); return messageHandler; } /** * 配置client,监听的topic * MQTT消息订阅绑定(消费者) **/ @Bean public MessageProducer inbound() { List<String> topicList = Arrays.asList(defaultTopic.trim().split(",")); String[] topics = new String[topicList.size()]; topicList.toArray(topics); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics); adapter.setCompletionTimeout(completionTimeout); DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes(true); adapter.setConverter(converter); adapter.setQos(2); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * MQTT信息通道(消费者) **/ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * MQTT消息处理器(消费者) **/ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { //处理接收消息 mqttReceiveHandle.handle(message); } }; } }
4、訊息處理類
/** * mqtt客户端消息处理类 **/ @Slf4j @Component public class MqttReceiveHandle { public void handle(Message<?> message) { log.info("收到订阅消息: {}", message); String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); log.info("消息主题:{}", topic); Object payLoad = message.getPayload(); byte[] data = (byte[]) payLoad; Packet packet = Packet.parse(data); log.info("发送的Packet数据{}", JSON.toJSONString(packet)); } }
5、mqtt傳送介面
import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; /** * mqtt发送消息 * (defaultRequestChannel = "mqttOutboundChannel" 对应config配置) * **/ @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { /** * 发送信息到MQTT服务器 * * @param */ void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param payload 消息主体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param qos 对消息处理的几种机制。 * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。 * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。 * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 * @param payload 消息主体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param payload 消息主体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param payload 消息主体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload); }
6、mqtt事件監聽類別
import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent; import org.springframework.integration.mqtt.event.MqttMessageSentEvent; import org.springframework.integration.mqtt.event.MqttSubscribedEvent; import org.springframework.stereotype.Component; @Slf4j @Component public class MqttListener { /** * 连接失败的事件通知 * @param mqttConnectionFailedEvent */ @EventListener(classes = MqttConnectionFailedEvent.class) public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) { log.info("连接失败的事件通知"); } /** * 已发送的事件通知 * @param mqttMessageSentEvent */ @EventListener(classes = MqttMessageSentEvent.class) public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) { log.info("已发送的事件通知"); } /** * 已传输完成的事件通知 * 1.QOS == 0,发送消息后会即可进行此事件回调,因为不需要等待回执 * 2.QOS == 1,发送消息后会等待ACK回执,ACK回执后会进行此事件通知 * 3.QOS == 2,发送消息后会等待PubRECV回执,知道收到PubCOMP后会进行此事件通知 * @param mqttMessageDeliveredEvent */ @EventListener(classes = MqttMessageDeliveredEvent.class) public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) { log.info("已传输完成的事件通知"); } /** * 消息订阅的事件通知 * @param mqttSubscribedEvent */ @EventListener(classes = MqttSubscribedEvent.class) public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) { log.info("消息订阅的事件通知"); } }
7、介面測試
@Resource private MqttGateway mqttGateway; /** * sendData 消息 * topic 订阅主题 **/ @RequestMapping(value = "/sendMqtt",method = RequestMethod.POST) public String sendMqtt(String sendData, String topic) { MqttMessage mqttMessage = new MqttMessage(); mqttGateway.sendToMqtt(topic, sendData); //mqttGateway.sendToMqttObject(topic, sendData.getBytes()); return "OK"; }
以上是SpringBoot如何實現MQTT訊息發送與接收的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

MQTT在PHP開發中的容錯與安全性考量概述:MQTT(MessageQueuingTelemetryTransport)是一種輕量級的通訊協議,被廣泛用於物聯網和機器對機器(M2M)通訊。在PHP開發中使用MQTT可以實現即時訊息傳遞和遠端控制等功能。本文將介紹在PHP開發中使用MQTT時需要考慮的容錯和安全性問題,並提供一些程式碼範例供參考。一、容錯

如何使用PHP和MQTT為網站添加即時使用者聊天功能在當今網路時代,網站使用者越來越需要即時的交流和溝通,為了滿足這種需求,我們可以使用PHP和MQTT來為網站添加即時使用者聊天功能。本文將介紹如何使用PHP和MQTT實現網站即時使用者聊天功能,並提供程式碼範例。確保環境準備在開始之前,確保你已經安裝並設定了PHP和MQTT的運作環境。你可以使用XAMPP等整合開發

MQTT(MessageQueuingTelemetryTransport)是一種輕量級的訊息傳輸協議,通常用於物聯網設備之間的通訊。 PHP是一種常用的伺服器端程式語言,可以用來開發MQTT客戶端。本文將介紹如何使用PHP開發MQTT客戶端,並包含以下內容:MQTT協定的基本概念PHPMQTT客戶端程式庫的選取和使用實例:使用PHPMQTT客戶端發布和

SpringBoot和SpringMVC都是Java開發中常用的框架,但它們之間有一些明顯的差異。本文將探究這兩個框架的特點和用途,並對它們的差異進行比較。首先,我們來了解一下SpringBoot。 SpringBoot是由Pivotal團隊開發的,它旨在簡化基於Spring框架的應用程式的建立和部署。它提供了一種快速、輕量級的方式來建立獨立的、可執行

使用PHP和MQTT實現即時數據分析的最佳實踐隨著物聯網和大數據技術的迅速發展,即時數據分析在各行各業中變得越來越重要。在即時數據分析中,MQTT(MQTelemetryTransport)作為一種輕量級的通訊協議,被廣泛應用於物聯網領域。結合PHP和MQTT,可以快速、有效率地實現即時數據分析。本文將介紹使用PHP和MQTT實現即時數據分析的最佳實踐,並

本文來寫個詳細的例子來說下dubbo+nacos+Spring Boot開發實戰。本文不會講述太多的理論的知識,會寫一個最簡單的例子來說明dubbo如何與nacos整合,快速建構開發環境。

使用PHP和MQTT建立即時聊天應用引言:隨著網路的快速發展和智慧型裝置的普及,即時通訊已經成為了現代社會中必不可少的功能之一。為了滿足人們的溝通需求,開發一個即時聊天應用程式已經成為了眾多開發者的追求目標。在本篇文章中,我們將介紹如何使用PHP和MQTT(MessageQueuingTelemetryTransport)協定來建立一個即時聊天應用。什麼是

MQTT協定的PHP實作方案比較和選擇指南摘要:MQTT(MessageQueuingTelemetryTransport)是一種輕量級的發布/訂閱通訊協議,適用於物聯網等低頻寬、高延遲的環境。本文將探討MQTT協定在PHP中的實現方案,並提供比較和選擇指南。引言:隨著物聯網的快速發展,越來越多的設備需要即時資料傳輸和通訊。 MQTT作為一種輕量級的
