目錄
Spring integration互動邏輯
1、maven依賴
2、yaml設定檔
3、mqtt生產者消費者設定類別
4、訊息處理類
5、mqtt傳送介面 
6、mqtt事件監聽類別 
 7、介面測試
首頁 Java java教程 SpringBoot如何實現MQTT訊息發送與接收

SpringBoot如何實現MQTT訊息發送與接收

May 12, 2023 pm 09:31 PM
springboot mqtt

Spring integration互動邏輯

對於發佈者:

1.訊息透過訊息閘道傳送出去,由 MessageChannel 的實例 DirectChannel 處理傳送的細節。

2.DirectChannel 收到訊息後,內部透過 MessageHandler 的實例 MqttPahoMessageHandler 傳送到指定的 Topic。

對於訂閱者:

1.透過注入 MessageProducerSupport 的實例 MqttPahoMessageDrivenChannelAdapter,實現訂閱Topic 和綁定訊息消費的 MessageChannel

2.同樣由 MessageChannel 的實例 DirectChannel 處理消費細節。

Channel 訊息後會傳送給我們自訂的 MqttInboundMessageHandler 實例進行消費。

可以看到整個處理的流程和前面將的基本一致。 Spring Integration 就是抽象化了這麼一套訊息通訊的機制,具體的通訊細節由它整合的中間件來決定。

1、maven依賴

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
    <version>2.5.1</version>
</dependency>
 
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-stream -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>5.5.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.5.5</version>
</dependency>
登入後複製

2、yaml設定檔

#mqtt配置
mqtt:
  username: 123
  password: 123
  #MQTT-服务器连接地址,如果有多个,用逗号隔开
  url: tcp://127.0.0.1:1883
  #MQTT-连接服务器默认客户端ID
  client:
    id: ${random.value}
  default:
    #MQTT-默认的消息推送主题,实际可在调用接口时指定
    topic: topic,mqtt/test/#
    #连接超时
  completionTimeout: 3000
登入後複製

3、mqtt生產者消費者設定類別

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
 
import java.util.Arrays;
import java.util.List;
 
/**
 * mqtt 推送and接收 消息类
 **/
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttSenderAndReceiveConfig {
 
    private static final byte[] WILL_DATA;
 
    static {
        WILL_DATA = "offline".getBytes();
    }
 
    @Autowired
    private MqttReceiveHandle mqttReceiveHandle;
 
    @Value("${mqtt.username}")
    private String username;
 
    @Value("${mqtt.password}")
    private String password;
 
    @Value("${mqtt.url}")
    private String hostUrl;
 
    @Value("${mqtt.client.id}")
    private String clientId;
 
    @Value("${mqtt.default.topic}")
    private String defaultTopic;
 
    @Value("${mqtt.completionTimeout}")
    private int completionTimeout;   //连接超时
 
    /**
     * MQTT连接器选项
     **/
    @Bean(value = "getMqttConnectOptions")
    public MqttConnectOptions getMqttConnectOptions1() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
        mqttConnectOptions.setCleanSession(true);
        // 设置超时时间 单位为秒
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
        mqttConnectOptions.setKeepAliveInterval(10);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
        return mqttConnectOptions;
    }
 
    /**
     * MQTT工厂
     **/
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions1());
        return factory;
    }
 
    /**
     * MQTT信息通道(生产者)
     **/
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
 
    /**
     * MQTT消息处理器(生产者)
     **/
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        messageHandler.setAsyncEvents(true); // 消息发送和传输完成会有异步的通知回调
        //设置转换器 发送bytes数据
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);
        return messageHandler;
    }
 
    /**
     * 配置client,监听的topic
     * MQTT消息订阅绑定(消费者)
     **/
    @Bean
    public MessageProducer inbound() {
        List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
        String[] topics = new String[topicList.size()];
        topicList.toArray(topics);
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics);
        adapter.setCompletionTimeout(completionTimeout);
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);
        adapter.setConverter(converter);
        adapter.setQos(2);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
 
    /**
     * MQTT信息通道(消费者)
     **/
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
 
    /**
     * MQTT消息处理器(消费者)
     **/
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                //处理接收消息
                mqttReceiveHandle.handle(message);
            }
        };
    }
}
登入後複製

4、訊息處理類

/**
 * mqtt客户端消息处理类
 **/
@Slf4j
@Component
public class MqttReceiveHandle {
 
    public void handle(Message<?> message) {
        log.info("收到订阅消息: {}", message);
        String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
        log.info("消息主题:{}", topic);
        Object payLoad = message.getPayload();
        byte[] data = (byte[]) payLoad;
        Packet packet = Packet.parse(data);
        log.info("发送的Packet数据{}", JSON.toJSONString(packet));
 
    }
}
登入後複製

5、mqtt傳送介面 

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
 
/**
 * mqtt发送消息
 * (defaultRequestChannel = "mqttOutboundChannel" 对应config配置)
 * **/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
 
    /**
     * 发送信息到MQTT服务器
     *
     * @param
     */
    void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
 
    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
 
    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param qos 对消息处理的几种机制。
     * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
     * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
     * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
 
    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload);
 
    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
}
登入後複製

6、mqtt事件監聽類別 

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.stereotype.Component;
 
@Slf4j
@Component
public class MqttListener {
    /**
     * 连接失败的事件通知
     * @param mqttConnectionFailedEvent
     */
    @EventListener(classes = MqttConnectionFailedEvent.class)
    public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) {
        log.info("连接失败的事件通知");
    }
 
    /**
     * 已发送的事件通知
     * @param mqttMessageSentEvent
     */
    @EventListener(classes = MqttMessageSentEvent.class)
    public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) {
        log.info("已发送的事件通知");
    }
 
    /**
     * 已传输完成的事件通知
     * 1.QOS == 0,发送消息后会即可进行此事件回调,因为不需要等待回执
     * 2.QOS == 1,发送消息后会等待ACK回执,ACK回执后会进行此事件通知
     * 3.QOS == 2,发送消息后会等待PubRECV回执,知道收到PubCOMP后会进行此事件通知
     * @param mqttMessageDeliveredEvent
     */
    @EventListener(classes = MqttMessageDeliveredEvent.class)
    public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) {
        log.info("已传输完成的事件通知");
    }
 
    /**
     * 消息订阅的事件通知
     * @param mqttSubscribedEvent
     */
    @EventListener(classes = MqttSubscribedEvent.class)
    public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) {
        log.info("消息订阅的事件通知");
    }
}
登入後複製

 7、介面測試

@Resource
    private MqttGateway mqttGateway;
    /**
     * sendData 消息
     * topic 订阅主题
     **/
    @RequestMapping(value = "/sendMqtt",method = RequestMethod.POST)
    public String sendMqtt(String sendData, String topic) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttGateway.sendToMqtt(topic, sendData);
        //mqttGateway.sendToMqttObject(topic, sendData.getBytes());
        return "OK";
    }
登入後複製

以上是SpringBoot如何實現MQTT訊息發送與接收的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

熱門話題

Java教學
1655
14
CakePHP 教程
1413
52
Laravel 教程
1306
25
PHP教程
1252
29
C# 教程
1226
24
MQTT在PHP開發中的容錯與安全性考慮 MQTT在PHP開發中的容錯與安全性考慮 Jul 08, 2023 am 11:34 AM

MQTT在PHP開發中的容錯與安全性考量概述:MQTT(MessageQueuingTelemetryTransport)是一種輕量級的通訊協議,被廣泛用於物聯網和機器對機器(M2M)通訊。在PHP開發中使用MQTT可以實現即時訊息傳遞和遠端控制等功能。本文將介紹在PHP開發中使用MQTT時需要考慮的容錯和安全性問題,並提供一些程式碼範例供參考。一、容錯

如何使用PHP和MQTT為網站新增即時使用者聊天功能 如何使用PHP和MQTT為網站新增即時使用者聊天功能 Jul 08, 2023 pm 07:46 PM

如何使用PHP和MQTT為網站添加即時使用者聊天功能在當今網路時代,網站使用者越來越需要即時的交流和溝通,為了滿足這種需求,我們可以使用PHP和MQTT來為網站添加即時使用者聊天功能。本文將介紹如何使用PHP和MQTT實現網站即時使用者聊天功能,並提供程式碼範例。確保環境準備在開始之前,確保你已經安裝並設定了PHP和MQTT的運作環境。你可以使用XAMPP等整合開發

PHP MQTT客戶端開髮指南 PHP MQTT客戶端開髮指南 Mar 27, 2024 am 09:21 AM

MQTT(MessageQueuingTelemetryTransport)是一種輕量級的訊息傳輸協議,通常用於物聯網設備之間的通訊。 PHP是一種常用的伺服器端程式語言,可以用來開發MQTT客戶端。本文將介紹如何使用PHP開發MQTT客戶端,並包含以下內容:MQTT協定的基本概念PHPMQTT客戶端程式庫的選取和使用實例:使用PHPMQTT客戶端發布和

SpringBoot與SpringMVC的比較及差別分析 SpringBoot與SpringMVC的比較及差別分析 Dec 29, 2023 am 11:02 AM

SpringBoot和SpringMVC都是Java開發中常用的框架,但它們之間有一些明顯的差異。本文將探究這兩個框架的特點和用途,並對它們的差異進行比較。首先,我們來了解一下SpringBoot。 SpringBoot是由Pivotal團隊開發的,它旨在簡化基於Spring框架的應用程式的建立和部署。它提供了一種快速、輕量級的方式來建立獨立的、可執行

使用PHP和MQTT實現即時數據分析的最佳實踐 使用PHP和MQTT實現即時數據分析的最佳實踐 Jul 08, 2023 pm 05:57 PM

使用PHP和MQTT實現即時數據分析的最佳實踐隨著物聯網和大數據技術的迅速發展,即時數據分析在各行各業中變得越來越重要。在即時數據分析中,MQTT(MQTelemetryTransport)作為一種輕量級的通訊協議,被廣泛應用於物聯網領域。結合PHP和MQTT,可以快速、有效率地實現即時數據分析。本文將介紹使用PHP和MQTT實現即時數據分析的最佳實踐,並

SpringBoot+Dubbo+Nacos 開發實戰教程 SpringBoot+Dubbo+Nacos 開發實戰教程 Aug 15, 2023 pm 04:49 PM

本文來寫個詳細的例子來說下dubbo+nacos+Spring Boot開發實戰。本文不會講述太多的理論的知識,會寫一個最簡單的例子來說明dubbo如何與nacos整合,快速建構開發環境。

使用PHP和MQTT建立即時聊天應用 使用PHP和MQTT建立即時聊天應用 Jul 08, 2023 pm 03:18 PM

使用PHP和MQTT建立即時聊天應用引言:隨著網路的快速發展和智慧型裝置的普及,即時通訊已經成為了現代社會中必不可少的功能之一。為了滿足人們的溝通需求,開發一個即時聊天應用程式已經成為了眾多開發者的追求目標。在本篇文章中,我們將介紹如何使用PHP和MQTT(MessageQueuingTelemetryTransport)協定來建立一個即時聊天應用。什麼是

MQTT協議的PHP實現方案比較和選擇指南 MQTT協議的PHP實現方案比較和選擇指南 Jul 08, 2023 pm 10:43 PM

MQTT協定的PHP實作方案比較和選擇指南摘要:MQTT(MessageQueuingTelemetryTransport)是一種輕量級的發布/訂閱通訊協議,適用於物聯網等低頻寬、高延遲的環境。本文將探討MQTT協定在PHP中的實現方案,並提供比較和選擇指南。引言:隨著物聯網的快速發展,越來越多的設備需要即時資料傳輸和通訊。 MQTT作為一種輕量級的

See all articles