目次
Spring 統合対話ロジック
ホームページ Java &#&チュートリアル SpringBoot が MQTT メッセージの送受信を実装する方法

SpringBoot が MQTT メッセージの送受信を実装する方法

May 12, 2023 pm 09:31 PM
springboot mqtt

Spring 統合対話ロジック

パブリッシャーの場合:

1. メッセージは、MessageChannel## のインスタンスによって、メッセージ ゲートウェイを介して送信されます。 #DirectChannel は送信の詳細を処理します。

2.

DirectChannel メッセージを受信すると、MessageHandler MqttPahoMessageHandler のインスタンスを通じて、指定されたトピックに内部的に送信されます。

サブスクライバーの場合:

1.

MessageProducerSupport MqttPahoMessageDrivenChannelAdapter のインスタンスを挿入することで、トピックをサブスクライブしてバインドできます。消費されたメッセージ MessageChannel

2. 消費の詳細は、

MessageChannel のインスタンス DirectChannel によっても処理されます。

チャネル メッセージは、カスタマイズされた

MqttInboundMessageHandler インスタンスに送信されて消費されます。

全体の処理プロセスは基本的に前のプロセスと同じであることがわかります。 Spring Integration は、このような一連のメッセージ通信メカニズムを抽象化し、具体的な通信の詳細は、統合されるミドルウェアによって決定されます。

1、Maven 依存関係

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
    <version>2.5.1</version>
</dependency>
 
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-stream -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>5.5.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.5.5</version>
</dependency>
ログイン後にコピー

2、yaml 設定ファイル

#mqtt配置
mqtt:
  username: 123
  password: 123
  #MQTT-服务器连接地址,如果有多个,用逗号隔开
  url: tcp://127.0.0.1:1883
  #MQTT-连接服务器默认客户端ID
  client:
    id: ${random.value}
  default:
    #MQTT-默认的消息推送主题,实际可在调用接口时指定
    topic: topic,mqtt/test/#
    #连接超时
  completionTimeout: 3000
ログイン後にコピー

3、MQTT プロデューサ コンシューマ設定クラス

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
 
import java.util.Arrays;
import java.util.List;
 
/**
 * mqtt 推送and接收 消息类
 **/
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttSenderAndReceiveConfig {
 
    private static final byte[] WILL_DATA;
 
    static {
        WILL_DATA = "offline".getBytes();
    }
 
    @Autowired
    private MqttReceiveHandle mqttReceiveHandle;
 
    @Value("${mqtt.username}")
    private String username;
 
    @Value("${mqtt.password}")
    private String password;
 
    @Value("${mqtt.url}")
    private String hostUrl;
 
    @Value("${mqtt.client.id}")
    private String clientId;
 
    @Value("${mqtt.default.topic}")
    private String defaultTopic;
 
    @Value("${mqtt.completionTimeout}")
    private int completionTimeout;   //连接超时
 
    /**
     * MQTT连接器选项
     **/
    @Bean(value = "getMqttConnectOptions")
    public MqttConnectOptions getMqttConnectOptions1() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
        mqttConnectOptions.setCleanSession(true);
        // 设置超时时间 单位为秒
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
        mqttConnectOptions.setKeepAliveInterval(10);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
        return mqttConnectOptions;
    }
 
    /**
     * MQTT工厂
     **/
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions1());
        return factory;
    }
 
    /**
     * MQTT信息通道(生产者)
     **/
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
 
    /**
     * MQTT消息处理器(生产者)
     **/
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        messageHandler.setAsyncEvents(true); // 消息发送和传输完成会有异步的通知回调
        //设置转换器 发送bytes数据
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);
        return messageHandler;
    }
 
    /**
     * 配置client,监听的topic
     * MQTT消息订阅绑定(消费者)
     **/
    @Bean
    public MessageProducer inbound() {
        List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
        String[] topics = new String[topicList.size()];
        topicList.toArray(topics);
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics);
        adapter.setCompletionTimeout(completionTimeout);
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);
        adapter.setConverter(converter);
        adapter.setQos(2);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
 
    /**
     * MQTT信息通道(消费者)
     **/
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
 
    /**
     * MQTT消息处理器(消费者)
     **/
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                //处理接收消息
                mqttReceiveHandle.handle(message);
            }
        };
    }
}
ログイン後にコピー

4、メッセージ処理クラス

/**
 * mqtt客户端消息处理类
 **/
@Slf4j
@Component
public class MqttReceiveHandle {
 
    public void handle(Message<?> message) {
        log.info("收到订阅消息: {}", message);
        String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
        log.info("消息主题:{}", topic);
        Object payLoad = message.getPayload();
        byte[] data = (byte[]) payLoad;
        Packet packet = Packet.parse(data);
        log.info("发送的Packet数据{}", JSON.toJSONString(packet));
 
    }
}
ログイン後にコピー

5. mqtt 送信インターフェイス

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
 
/**
 * mqtt发送消息
 * (defaultRequestChannel = "mqttOutboundChannel" 对应config配置)
 * **/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
 
    /**
     * 发送信息到MQTT服务器
     *
     * @param
     */
    void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
 
    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
 
    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param qos 对消息处理的几种机制。
     * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
     * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
     * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
 
    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload);
 
    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
}
ログイン後にコピー

6. mqtt イベント リスニング クラス

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.stereotype.Component;
 
@Slf4j
@Component
public class MqttListener {
    /**
     * 连接失败的事件通知
     * @param mqttConnectionFailedEvent
     */
    @EventListener(classes = MqttConnectionFailedEvent.class)
    public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) {
        log.info("连接失败的事件通知");
    }
 
    /**
     * 已发送的事件通知
     * @param mqttMessageSentEvent
     */
    @EventListener(classes = MqttMessageSentEvent.class)
    public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) {
        log.info("已发送的事件通知");
    }
 
    /**
     * 已传输完成的事件通知
     * 1.QOS == 0,发送消息后会即可进行此事件回调,因为不需要等待回执
     * 2.QOS == 1,发送消息后会等待ACK回执,ACK回执后会进行此事件通知
     * 3.QOS == 2,发送消息后会等待PubRECV回执,知道收到PubCOMP后会进行此事件通知
     * @param mqttMessageDeliveredEvent
     */
    @EventListener(classes = MqttMessageDeliveredEvent.class)
    public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) {
        log.info("已传输完成的事件通知");
    }
 
    /**
     * 消息订阅的事件通知
     * @param mqttSubscribedEvent
     */
    @EventListener(classes = MqttSubscribedEvent.class)
    public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) {
        log.info("消息订阅的事件通知");
    }
}
ログイン後にコピー

7. インターフェイス テスト

@Resource
    private MqttGateway mqttGateway;
    /**
     * sendData 消息
     * topic 订阅主题
     **/
    @RequestMapping(value = "/sendMqtt",method = RequestMethod.POST)
    public String sendMqtt(String sendData, String topic) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttGateway.sendToMqtt(topic, sendData);
        //mqttGateway.sendToMqttObject(topic, sendData.getBytes());
        return "OK";
    }
ログイン後にコピー

以上がSpringBoot が MQTT メッセージの送受信を実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

PHP 開発における MQTT の耐障害性とセキュリティに関する考慮事項 PHP 開発における MQTT の耐障害性とセキュリティに関する考慮事項 Jul 08, 2023 am 11:34 AM

PHP 開発における MQTT の耐障害性とセキュリティに関する考慮事項の概要: MQTT (MessageQueuingTelemetryTransport) は、モノのインターネットおよびマシン間 (M2M) 通信で広く使用されている軽量の通信プロトコルです。 PHP 開発で MQTT を使用すると、リアルタイム メッセージングやリモート コントロールなどの機能を実現できます。この記事では、PHP 開発で MQTT を使用するときに考慮する必要があるフォールト トレランスとセキュリティの問題を紹介し、参考用のコード例をいくつか示します。 1. 耐障害性

PHP と MQTT を使用して、リアルタイムのユーザー チャット機能を Web サイトに追加する方法 PHP と MQTT を使用して、リアルタイムのユーザー チャット機能を Web サイトに追加する方法 Jul 08, 2023 pm 07:46 PM

PHP と MQTT を使用して Web サイトにリアルタイムのユーザー チャット機能を追加する方法 今日のインターネット時代、Web サイト ユーザーはますますリアルタイムの通信とコミュニケーションを必要としています。この需要を満たすために、PHP と MQTT を使用してリアルタイムのユーザー チャット機能を追加できます。 - ウェブサイトへのユーザーチャット機能。この記事では、PHP と MQTT を使用して Web サイトのリアルタイム ユーザー チャット機能を実装する方法とコード例を紹介します。環境の準備ができていることを確認する 開始する前に、PHP および MQTT ランタイム環境がインストールおよび構成されていることを確認してください。 XAMPPなどの統合開発が利用可能

PHP MQTT クライアント開発ガイド PHP MQTT クライアント開発ガイド Mar 27, 2024 am 09:21 AM

MQTT (MessageQueuingTelemetryTransport) は、IoT デバイス間の通信に一般的に使用される軽量のメッセージ送信プロトコルです。 PHP は、MQTT クライアントの開発に使用できる、一般的に使用されるサーバー側プログラミング言語です。この記事では、PHP を使用して MQTT クライアントを開発する方法を紹介します。以下の内容が含まれます。 MQTT プロトコルの基本概念 PHPMQTT クライアント ライブラリの選択と使用例: PHPMQTT クライアントを使用した公開と使用

SpringBootとSpringMVCの比較と差異分析 SpringBootとSpringMVCの比較と差異分析 Dec 29, 2023 am 11:02 AM

SpringBoot と SpringMVC はどちらも Java 開発で一般的に使用されるフレームワークですが、それらの間には明らかな違いがいくつかあります。この記事では、これら 2 つのフレームワークの機能と使用法を調べ、その違いを比較します。まず、SpringBoot について学びましょう。 SpringBoot は、Spring フレームワークに基づいたアプリケーションの作成と展開を簡素化するために、Pivo​​tal チームによって開発されました。スタンドアロンの実行可能ファイルを構築するための高速かつ軽量な方法を提供します。

PHP と MQTT を使用してリアルタイム チャット アプリケーションを構築する PHP と MQTT を使用してリアルタイム チャット アプリケーションを構築する Jul 08, 2023 pm 03:18 PM

PHP と MQTT を使用したリアルタイム チャット アプリケーションの構築 はじめに: インターネットの急速な発展とスマート デバイスの普及により、リアルタイム コミュニケーションは現代社会において不可欠な機能の 1 つになりました。人々のコミュニケーションのニーズを満たすために、リアルタイム チャット アプリケーションの開発は多くの開発者によって追求される目標となっています。この記事では、PHP と MQTT (MessageQueuingTelemetryTransport) プロトコルを使用してリアルタイム チャット アプリケーションを構築する方法を紹介します。とは

PHP と MQTT を使用したリアルタイム データ分析のベスト プラクティス PHP と MQTT を使用したリアルタイム データ分析のベスト プラクティス Jul 08, 2023 pm 05:57 PM

PHP と MQTT を使用したリアルタイム データ分析のベスト プラクティス IoT とビッグ データ テクノロジの急速な発展に伴い、リアルタイム データ分析はさまざまな業界でますます重要になっています。リアルタイムデータ分析では、軽量な通信プロトコルとして MQTT (MQTelemetryTransport) がモノのインターネットの分野で広く使用されています。 PHP と MQTT を組み合わせることで、リアルタイムのデータ分析を迅速かつ効率的に実現できます。この記事では、PHP と MQTT を使用したリアルタイム データ分析のベスト プラクティスを紹介します。

SpringBoot+Dubbo+Nacos開発実践チュートリアル SpringBoot+Dubbo+Nacos開発実践チュートリアル Aug 15, 2023 pm 04:49 PM

この記事では、dubbo+nacos+Spring Boot の実際の開発について詳しく説明する例を書きます。この記事では理論的な知識はあまり取り上げませんが、dubbo を nacos と統合して開発環境を迅速に構築する方法を説明する最も簡単な例を書きます。

MQTT プロトコルの PHP 実装ソリューションの比較と選択ガイド MQTT プロトコルの PHP 実装ソリューションの比較と選択ガイド Jul 08, 2023 pm 10:43 PM

MQTT プロトコルの PHP 実装比較および選択ガイド 概要: MQTT (MessageQueuingTelemetryTransport) は、モノのインターネットなどの低帯域幅、高遅延の環境に適した軽量のパブリッシュ/サブスクライブ通信プロトコルです。この記事では、PHP での MQTT プロトコルの実装を検討し、比較と選択のガイドを提供します。はじめに: モノのインターネットの急速な発展に伴い、リアルタイムのデータ送信と通信を必要とするデバイスがますます増えています。軽量としての MQTT

See all articles