首頁 Java java教程 springboot如何整合mqtt

springboot如何整合mqtt

May 15, 2023 pm 04:25 PM
springboot mqtt

springboot 整合mqtt

建置的時候如果你使用的是叢集記得開放以下埠:

springboot如何整合mqtt

好了,搭建成功下一步就是我們的java程式要與mqtt連接, 這裡有兩種方式(其實不止兩種)進行連接.

一是直接使用MQTT Java 客戶端庫

二是使用spring integration mqtt也是比較推薦的一種,也是我們主講這種.

第一步添加maven dependency

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.5.14</version>
        </dependency>
登入後複製

第二步新增配置

1 先寫好一些基本配置

mqtt:
 username: test                        # 账号
 password: 123456                      # 密码
 host-url: tcp://127.0.0.1:1883        # mqtt连接tcp地址
 in-client-id: ${random.value}         # 随机值,使出入站 client ID 不同
 out-client-id: ${random.value}
 client-id: ${random.int}                   # 客户端Id,不能相同,采用随机数 ${random.value}
 default-topic: test/#,topic/+/+/up         # 默认主题
 timeout: 60                                # 超时时间
 keepalive: 60                              # 保持连接
 clearSession: true                         # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)
登入後複製

2.然後寫一個對應的類別MqttProperties

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * MqttProperties 
 *
 * @author hengzi
 * @date 2022/8/23
 */
@Component
public class MqttProperties {

    /**
     * 用户名
     */
    @Value("${mqtt.username}")
    private String username;

    /**
     * 密码
     */
    @Value("${mqtt.password}")
    private String password;

    /**
     * 连接地址
     */
    @Value("${mqtt.host-url}")
    private String hostUrl;

    /**
     * 进-客户Id
     */
    @Value("${mqtt.in-client-id}")
    private String inClientId;

    /**
     * 出-客户Id
     */
    @Value("${mqtt.out-client-id}")
    private String outClientId;

    /**
     * 客户Id
     */
    @Value("${mqtt.client-id}")
    private String clientId;

    /**
     * 默认连接话题
     */
    @Value("${mqtt.default-topic}")
    private String defaultTopic;

    /**
     * 超时时间
     */
    @Value("${mqtt.timeout}")
    private int timeout;

    /**
     * 保持连接数
     */
    @Value("${mqtt.keepalive}")
    private int keepalive;

    /**是否清除session*/
    @Value("${mqtt.clearSession}")
    private boolean clearSession;

	// ...getter and setter

}
登入後複製

接下來就是配置一些亂七八糟的東西, 這裡有很多概念性的東西例如管道channel, 適配器adapter, 入站Inbound, 出站Outbound,等等等等,看起來是非常頭痛的

好吧,那就一個一個來,

首先連接mqtt需要一個客戶端, 那麼我們就開一個客戶端工廠, 這裡可以產生很多很多的客戶端

    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }
登入後複製

然後再搞兩根管子(channel),一個出站,一個入站

    //出站消息管道,
    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }

    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        return new DirectChannel();
    }
登入後複製

為了讓這些管子能流通就需要一個適配器(adapter)

    // Mqtt 管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }
登入後複製

然後定義訊息生產者

    // 消息生产者
    @Bean
    public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //入站投递的通道
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setQos(1);
        return adapter;
    }
登入後複製

那我們收到訊息去哪裡處理呢,答案是這裡:

    @Bean
    //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handleMessage() {
    	// 这个 mqttMessageHandle 其实就是一个 MessageHandler 的实现类(这个类我放下面)
        return mqttMessageHandle;
		// 你也可以这样写
//        return new MessageHandler() {
//            @Override
//            public void handleMessage(Message<?> message) throws MessagingException {
//                // do something
//            }
//        };
登入後複製

到這裡我們其實已經可以接受到來自mqtt的訊息了

接下來配置向mqtt發送訊息

配置出站處理器

    // 出站处理器
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return handler;
    }
登入後複製

這個出站處理器在我看來就是讓別人(MqttPahoMessageHandler)處理了, 我就不處理了,我只管我要發送什麼,至於怎麼發送,由MqttPahoMessageHandler來完成

接下來我們定義一個介面即可

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * MqttGateway
 *
 * @author hengzi
 * @date 2022/8/23
 */

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String data);
}
登入後複製

我們直接呼叫這個介面就可以向mqtt 發送資料

到目前為止,整個設定檔長這樣:

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

/**
 * MqttConfig
 *
 * @author hengzi
 * @date 2022/8/23
 */
@Configuration
public class MqttConfig {


    /**
     *  以下属性将在配置文件中读取
     **/
    @Autowired
    private MqttProperties mqttProperties;


    //Mqtt 客户端工厂
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    // Mqtt 管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }


    // 消息生产者
    @Bean
    public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //入站投递的通道
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setQos(1);
        return adapter;
    }


    // 出站处理器
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return handler;
    }

    @Bean
    //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handleMessage() {
        return mqttMessageHandle;
    }

    //出站消息管道,
    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }


    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        return new DirectChannel();
    }
}
登入後複製

處理訊息的MqttMessageHandle

@Component
public class MqttMessageHandle implements MessageHandler {
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
     
    }
}
登入後複製

在進一步了解之後,發現可以優化的地方,比如channel 的類型是有很多種的, 這裡使用的DirectChannel,是Spring Integration預設的訊息通道,它將訊息發送給為一個訂閱者,然後阻礙發送直到訊息被接收,傳輸方式都是同步的方式,都是由一個線程來運行的.

這裡我們可以將入站channel改成ExecutorChannel一個可以使用多執行緒的channel

@Bean
    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 最大可创建的线程数
        int maxPoolSize = 200;
        executor.setMaxPoolSize(maxPoolSize);
        // 核心线程池大小
        int corePoolSize = 50;
        executor.setCorePoolSize(corePoolSize);
        // 队列最大长度
        int queueCapacity = 1000;
        executor.setQueueCapacity(queueCapacity);
        // 线程池维护线程所允许的空闲时间
        int keepAliveSeconds = 300;
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 线程池对拒绝任务(无线程可用)的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        // 用线程池
        return new ExecutorChannel(mqttThreadPoolTaskExecutor());
    }
登入後複製

到這裡其實可以運行了.

但是這樣配置其實還是有點亂, 於是我查找官網, f發現一種更簡單的配置方法叫Java DSL

我們參考官網,稍微改一下,使用DSL的方式進行配置:

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * MqttConfigV2
 *
 * @author hengzi
 * @date 2022/8/24
 */
@Configuration
public class MqttConfigV2 {

    @Autowired
    private MqttProperties mqttProperties;

    @Autowired
    private MqttMessageHandle mqttMessageHandle;


    //Mqtt 客户端工厂 所有客户端从这里产生
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    // Mqtt 管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }

    // 消息生产者 (接收,处理来自mqtt的消息)
    @Bean
    public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {
        adapter.setCompletionTimeout(5000);
        adapter.setQos(1);
        return IntegrationFlows.from( adapter)
                .channel(new ExecutorChannel(mqttThreadPoolTaskExecutor()))
                .handle(mqttMessageHandle)
                .get();
    }

    @Bean
    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 最大可创建的线程数
        int maxPoolSize = 200;
        executor.setMaxPoolSize(maxPoolSize);
        // 核心线程池大小
        int corePoolSize = 50;
        executor.setCorePoolSize(corePoolSize);
        // 队列最大长度
        int queueCapacity = 1000;
        executor.setQueueCapacity(queueCapacity);
        // 线程池维护线程所允许的空闲时间
        int keepAliveSeconds = 300;
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 线程池对拒绝任务(无线程可用)的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

    // 出站处理器 (向 mqtt 发送消息)
    @Bean
    public IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {

        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return IntegrationFlows.from( "mqttOutboundChannel").handle(handler).get();
    }

}
登入後複製

這樣看起來真的簡單多了, 頭也沒那麼大了, 我要是早知道多好.

好了以上就是配置相關的, 到這裡其實是已經完成springboot 與mqtt 的整合了.

但其實我一直有個想法, 就是我們接收的訊息都是在handleMessage這個方法裡面執行的,

	@Override
    public void handleMessage(Message<?> message) throws MessagingException {
     			
    }
登入後複製

所以我就有了一個想法, 能不能根據我訂閱的主題,在不同的方法執行, 對於這個問題,其實你用if ... else . ..也能實現, 但很明顯,如果我訂閱的主題很多的話, 那寫起來就很頭痛了.

對於這個問題,有兩種思路, 一個是添加 Spring Integration的路由router,根據不同topic路由到不同的channel, 這個我也知道能不能實現, 我這裡就不討論了.

#第二種是, 我也不知道名字改如何叫, 我是參考了spring@Controller的設計, 暫且叫他註解模式.

眾所周知,我們的介面都是在類別上加@Controller這個註解, 就代表這個類別是http 介面, 再在方法加上@RequestMapping就能實現不同的url 調用不同的方法.

參數這個設計我們在類別上面加@MqttService就代表這個類別是專門處理mqtt訊息的服務類別
同時在這個類別的方法上加上@MqttTopic就代表這個主題由這個方法處理.

OK, 理論有了,接下來就是實踐.

先定義兩個註解

import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;

import java.lang.annotation.*;

@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqttService {

    @AliasFor(
            annotation = Component.class
    )
    String value() default "";
}
登入後複製

加上@Component註解spring就會掃描, 並註冊到IOC容器裡

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttTopic {

    /**
     * 主题名字
     */
    String value() default "";

}
登入後複製

參考@RequestMapping我們使用起來應該是這樣的:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

/**
 * MqttTopicHandle
 *
 * @author hengzi
 * @date 2022/8/24
 */
@MqttService
public class MqttTopicHandle {

    public static final Logger log = LoggerFactory.getLogger(MqttTopicHandle.class);

	// 这里的 # 号是通配符
    @MqttTopic("test/#")
    public void test(Message<?> message){
        log.info("test="+message.getPayload());
    }
	
	// 这里的 + 号是通配符
    @MqttTopic("topic/+/+/up")
    public void up(Message<?> message){
        log.info("up="+message.getPayload());
    }

	// 注意 你必须先订阅
    @MqttTopic("topic/1/2/down")
    public void down(Message<?> message){
        log.info("down="+message.getPayload());
    }
}
登入後複製

OK 接下來就是實作這樣的使用

分析:

當我們收到訊息時, 我們從IOC容器中找到所有帶@MqttService註解的類別

然後遍歷這些類別, 找到帶有@MqttTopic的方法

接著把@MqttTopic註解的的值與接受到的topic 進行比較

如果一致則執行這個方法

廢話少說, 上程式碼

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;

/**
 * MessageHandleService
 *
 * @author hengzi
 * @date 2022/8/24
 */
@Component
public class MqttMessageHandle implements MessageHandler {

    public static final Logger log = LoggerFactory.getLogger(MqttMessageHandle.class);

    // 包含 @MqttService注解 的类(Component)
    public static Map<String, Object> mqttServices;


    /**
     * 所有mqtt到达的消息都会在这里处理
     * 要注意这个方法是在线程池里面运行的
     * @param message message
     */
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        getMqttTopicService(message);
    }

    public Map<String, Object> getMqttServices(){
        if(mqttServices==null){
            mqttServices = SpringUtils.getBeansByAnnotation(MqttService.class);
        }
        return mqttServices;
    }

    public void getMqttTopicService(Message<?> message){
        // 在这里 我们根据不同的 主题 分发不同的消息
        String receivedTopic = message.getHeaders().get("mqtt_receivedTopic",String.class);
        if(receivedTopic==null || "".equals(receivedTopic)){
            return;
        }
        for(Map.Entry<String, Object> entry : getMqttServices().entrySet()){
        	// 把所有带有 @MqttService 的类遍历
            Class<?> clazz = entry.getValue().getClass();
            // 获取他所有方法
            Method[] methods = clazz.getDeclaredMethods();
            for ( Method method: methods ){
                if (method.isAnnotationPresent(MqttTopic.class)){
                	// 如果这个方法有 这个注解
                    MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);
                    if(isMatch(receivedTopic,handleTopic.value())){
                    	// 并且 这个 topic 匹配成功
                        try {
                            method.invoke(SpringUtils.getBean(clazz),message);
                            return;
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                            log.error("代理炸了");
                        } catch (InvocationTargetException e) {
                            log.error("执行 {} 方法出现错误",handleTopic.value(),e);
                        }
                    }
                }
            }
        }
    }


    /**
     * mqtt 订阅的主题与我实际的主题是否匹配
     * @param topic 是实际的主题
     * @param pattern 是我订阅的主题 可以是通配符模式
     * @return 是否匹配
     */
    public static boolean isMatch(String topic, String pattern){

        if((topic==null) || (pattern==null) ){
            return false;
        }

        if(topic.equals(pattern)){
            // 完全相等是肯定匹配的
            return true;
        }

        if("#".equals(pattern)){
            // # 号代表所有主题  肯定匹配的
            return true;
        }
        String[] splitTopic = topic.split("/");
        String[] splitPattern = pattern.split("/");

        boolean match = true;

        // 如果包含 # 则只需要判断 # 前面的
        for (int i = 0; i < splitPattern.length; i++) {
            if(!"#".equals(splitPattern[i])){
                // 不是# 号 正常判断
                if(i>=splitTopic.length){
                    // 此时长度不相等 不匹配
                    match = false;
                    break;
                }
                if(!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])){
                    // 不相等 且不等于 +
                    match = false;
                    break;
                }
            }
            else {
                // 是# 号  肯定匹配的
                break;
            }
        }

        return match;
    }

}
登入後複製

工具類別SpringUtils

#
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * spring工具类 方便在非spring管理环境中获取bean
 * 
 */
@Component
public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware 
{
    /** Spring应用上下文环境 */
    private static ConfigurableListableBeanFactory beanFactory;

    private static ApplicationContext applicationContext;


    public static Map<String, Object> getBeansByAnnotation(Class clsName) throws BeansException{

        return beanFactory.getBeansWithAnnotation(clsName);
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException 
    {
        SpringUtils.beanFactory = beanFactory;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException 
    {
        SpringUtils.applicationContext = applicationContext;
    }

    /**
     * 获取对象
     *
     * @param name
     * @return Object 一个以所给名字注册的bean的实例
     * @throws org.springframework.beans.BeansException
     *
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException
    {
        return (T) beanFactory.getBean(name);
    }

    /**
     * 获取类型为requiredType的对象
     *
     * @param clz
     * @return
     * @throws org.springframework.beans.BeansException
     *
     */
    public static <T> T getBean(Class<T> clz) throws BeansException
    {
        T result = (T) beanFactory.getBean(clz);
        return result;
    }

    /**
     * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
     *
     * @param name
     * @return boolean
     */
    public static boolean containsBean(String name)
    {
        return beanFactory.containsBean(name);
    }

    /**
     * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.isSingleton(name);
    }

    /**
     * @param name
     * @return Class 注册对象的类型
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.getType(name);
    }

    /**
     * 如果给定的bean名字在bean定义中有别名,则返回这些别名
     *
     * @param name
     * @return
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.getAliases(name);
    }

    /**
     * 获取aop代理对象
     * 
     * @param invoker
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> T getAopProxy(T invoker)
    {
        return (T) AopContext.currentProxy();
    }

    /**
     * 获取当前的环境配置,无配置返回null
     *
     * @return 当前的环境配置
     */
    public static String[] getActiveProfiles()
    {
        return applicationContext.getEnvironment().getActiveProfiles();
    }

}
登入後複製

OK, 大功告成. 终于舒服了, 终于不用写if...else...了, 个人感觉这样处理起来会更加优雅. 写代码最重要是什么, 是优雅~

以上!

参考文章:

  • 使用 Spring integration 在Springboot中集成Mqtt

  • Spring Integration(一)概述

附:

动态添加主题方式:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.stereotype.Service;

import java.util.Arrays;

/**
 * MqttService
 *
 * @author hengzi
 * @date 2022/8/25
 */
@Service
public class MqttService {

    @Autowired
    private MqttPahoMessageDrivenChannelAdapter adapter;


    public void addTopic(String topic) {
        addTopic(topic, 1);
    }

    public void addTopic(String topic,int qos) {
        String[] topics = adapter.getTopic();
        if(!Arrays.asList(topics).contains(topic)){
            adapter.addTopic(topic,qos);
        }
    }

    public void removeTopic(String topic) {
        adapter.removeTopic(topic);
    }

}
登入後複製

直接调用就行

以上是springboot如何整合mqtt的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳圖形設置
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌
威爾R.E.P.O.有交叉遊戲嗎?
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

MQTT在PHP開發中的容錯與安全性考慮 MQTT在PHP開發中的容錯與安全性考慮 Jul 08, 2023 am 11:34 AM

MQTT在PHP開發中的容錯與安全性考量概述:MQTT(MessageQueuingTelemetryTransport)是一種輕量級的通訊協議,被廣泛用於物聯網和機器對機器(M2M)通訊。在PHP開發中使用MQTT可以實現即時訊息傳遞和遠端控制等功能。本文將介紹在PHP開發中使用MQTT時需要考慮的容錯和安全性問題,並提供一些程式碼範例供參考。一、容錯

如何使用PHP和MQTT為網站新增即時使用者聊天功能 如何使用PHP和MQTT為網站新增即時使用者聊天功能 Jul 08, 2023 pm 07:46 PM

如何使用PHP和MQTT為網站添加即時使用者聊天功能在當今網路時代,網站使用者越來越需要即時的交流和溝通,為了滿足這種需求,我們可以使用PHP和MQTT來為網站添加即時使用者聊天功能。本文將介紹如何使用PHP和MQTT實現網站即時使用者聊天功能,並提供程式碼範例。確保環境準備在開始之前,確保你已經安裝並設定了PHP和MQTT的運作環境。你可以使用XAMPP等整合開發

PHP MQTT客戶端開髮指南 PHP MQTT客戶端開髮指南 Mar 27, 2024 am 09:21 AM

MQTT(MessageQueuingTelemetryTransport)是一種輕量級的訊息傳輸協議,通常用於物聯網設備之間的通訊。 PHP是一種常用的伺服器端程式語言,可以用來開發MQTT客戶端。本文將介紹如何使用PHP開發MQTT客戶端,並包含以下內容:MQTT協定的基本概念PHPMQTT客戶端程式庫的選取和使用實例:使用PHPMQTT客戶端發布和

SpringBoot與SpringMVC的比較及差別分析 SpringBoot與SpringMVC的比較及差別分析 Dec 29, 2023 am 11:02 AM

SpringBoot和SpringMVC都是Java開發中常用的框架,但它們之間有一些明顯的差異。本文將探究這兩個框架的特點和用途,並對它們的差異進行比較。首先,我們來了解一下SpringBoot。 SpringBoot是由Pivotal團隊開發的,它旨在簡化基於Spring框架的應用程式的建立和部署。它提供了一種快速、輕量級的方式來建立獨立的、可執行

使用PHP和MQTT實現即時數據分析的最佳實踐 使用PHP和MQTT實現即時數據分析的最佳實踐 Jul 08, 2023 pm 05:57 PM

使用PHP和MQTT實現即時數據分析的最佳實踐隨著物聯網和大數據技術的迅速發展,即時數據分析在各行各業中變得越來越重要。在即時數據分析中,MQTT(MQTelemetryTransport)作為一種輕量級的通訊協議,被廣泛應用於物聯網領域。結合PHP和MQTT,可以快速、有效率地實現即時數據分析。本文將介紹使用PHP和MQTT實現即時數據分析的最佳實踐,並

使用PHP和MQTT建立即時聊天應用 使用PHP和MQTT建立即時聊天應用 Jul 08, 2023 pm 03:18 PM

使用PHP和MQTT建立即時聊天應用引言:隨著網路的快速發展和智慧型裝置的普及,即時通訊已經成為了現代社會中必不可少的功能之一。為了滿足人們的溝通需求,開發一個即時聊天應用程式已經成為了眾多開發者的追求目標。在本篇文章中,我們將介紹如何使用PHP和MQTT(MessageQueuingTelemetryTransport)協定來建立一個即時聊天應用。什麼是

SpringBoot+Dubbo+Nacos 開發實戰教程 SpringBoot+Dubbo+Nacos 開發實戰教程 Aug 15, 2023 pm 04:49 PM

本文來寫個詳細的例子來說下dubbo+nacos+Spring Boot開發實戰。本文不會講述太多的理論的知識,會寫一個最簡單的例子來說明dubbo如何與nacos整合,快速建構開發環境。

MQTT協議的PHP實現方案比較和選擇指南 MQTT協議的PHP實現方案比較和選擇指南 Jul 08, 2023 pm 10:43 PM

MQTT協定的PHP實作方案比較和選擇指南摘要:MQTT(MessageQueuingTelemetryTransport)是一種輕量級的發布/訂閱通訊協議,適用於物聯網等低頻寬、高延遲的環境。本文將探討MQTT協定在PHP中的實現方案,並提供比較和選擇指南。引言:隨著物聯網的快速發展,越來越多的設備需要即時資料傳輸和通訊。 MQTT作為一種輕量級的

See all articles