首页 Java java教程 springboot如何整合mqtt

springboot如何整合mqtt

May 15, 2023 pm 04:25 PM
springboot mqtt

springboot 整合 mqtt

搭建的时候如果你使用的是集群 记得开放以下端口:

springboot如何整合mqtt

好了, 搭建成功下一步就是我们的java程序要与mqtt连接, 这里有两种方式(其实不止两种)进行连接.

一是 直接使用 MQTT Java 客户端库

二是使用 spring integration mqtt也是比较推荐的一种,也是我们主讲这种.

第一步 添加 maven dependency

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.5.14</version>
        </dependency>
登录后复制

第二步 添加配置

1 先写好一些基本配置

mqtt:
 username: test                        # 账号
 password: 123456                      # 密码
 host-url: tcp://127.0.0.1:1883        # mqtt连接tcp地址
 in-client-id: ${random.value}         # 随机值,使出入站 client ID 不同
 out-client-id: ${random.value}
 client-id: ${random.int}                   # 客户端Id,不能相同,采用随机数 ${random.value}
 default-topic: test/#,topic/+/+/up         # 默认主题
 timeout: 60                                # 超时时间
 keepalive: 60                              # 保持连接
 clearSession: true                         # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)
登录后复制

2.然后写一个对应的类MqttProperties

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * MqttProperties 
 *
 * @author hengzi
 * @date 2022/8/23
 */
@Component
public class MqttProperties {

    /**
     * 用户名
     */
    @Value("${mqtt.username}")
    private String username;

    /**
     * 密码
     */
    @Value("${mqtt.password}")
    private String password;

    /**
     * 连接地址
     */
    @Value("${mqtt.host-url}")
    private String hostUrl;

    /**
     * 进-客户Id
     */
    @Value("${mqtt.in-client-id}")
    private String inClientId;

    /**
     * 出-客户Id
     */
    @Value("${mqtt.out-client-id}")
    private String outClientId;

    /**
     * 客户Id
     */
    @Value("${mqtt.client-id}")
    private String clientId;

    /**
     * 默认连接话题
     */
    @Value("${mqtt.default-topic}")
    private String defaultTopic;

    /**
     * 超时时间
     */
    @Value("${mqtt.timeout}")
    private int timeout;

    /**
     * 保持连接数
     */
    @Value("${mqtt.keepalive}")
    private int keepalive;

    /**是否清除session*/
    @Value("${mqtt.clearSession}")
    private boolean clearSession;

	// ...getter and setter

}
登录后复制

接下来就是配置一些乱七八糟的东西, 这里有很多概念性的东西 比如 管道channel, 适配器 adapter, 入站Inbound, 出站Outbound,等等等等, 看起来是非常头痛的

好吧,那就一个一个来,

首先连接mqtt需要一个客户端, 那么我们就开一个客户端工厂, 这里可以产生很多很多的客户端

    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }
登录后复制

然后再搞两根管子(channel),一个出站,一个入站

    //出站消息管道,
    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }

    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        return new DirectChannel();
    }
登录后复制

为了使这些管子能流通 就需要一个适配器(adapter)

    // Mqtt 管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }
登录后复制

然后定义消息生产者

    // 消息生产者
    @Bean
    public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //入站投递的通道
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setQos(1);
        return adapter;
    }
登录后复制

那我们收到消息去哪里处理呢,答案是这里:

    @Bean
    //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handleMessage() {
    	// 这个 mqttMessageHandle 其实就是一个 MessageHandler 的实现类(这个类我放下面)
        return mqttMessageHandle;
		// 你也可以这样写
//        return new MessageHandler() {
//            @Override
//            public void handleMessage(Message<?> message) throws MessagingException {
//                // do something
//            }
//        };
登录后复制

到这里我们其实已经可以接受到来自mqtt的消息了

接下来配置向mqtt发送消息

配置 出站处理器

    // 出站处理器
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return handler;
    }
登录后复制

这个 出站处理器 在我看来就是让别人 (MqttPahoMessageHandler)处理了, 我就不处理了,我只管我要发送什么,至于怎么发送,由MqttPahoMessageHandler来完成

接下来我们定义一个接口即可

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * MqttGateway
 *
 * @author hengzi
 * @date 2022/8/23
 */

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String data);
}
登录后复制

我们直接调用这个接口就可以向mqtt 发送数据

到目前为止,整个配置文件长这样:

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

/**
 * MqttConfig
 *
 * @author hengzi
 * @date 2022/8/23
 */
@Configuration
public class MqttConfig {


    /**
     *  以下属性将在配置文件中读取
     **/
    @Autowired
    private MqttProperties mqttProperties;


    //Mqtt 客户端工厂
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    // Mqtt 管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }


    // 消息生产者
    @Bean
    public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //入站投递的通道
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setQos(1);
        return adapter;
    }


    // 出站处理器
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return handler;
    }

    @Bean
    //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handleMessage() {
        return mqttMessageHandle;
    }

    //出站消息管道,
    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }


    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        return new DirectChannel();
    }
}
登录后复制

处理消息的 MqttMessageHandle

@Component
public class MqttMessageHandle implements MessageHandler {
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
     
    }
}
登录后复制

在进一步了解之后,发现可以优化的地方,比如channel 的类型是有很多种的, 这里使用的DirectChannel,是Spring Integration默认的消息通道,它将消息发送给为一个订阅者,然后阻碍发送直到消息被接收,传输方式都是同步的方式,都是由一个线程来运行的.

这里我们可以将入站channel改成 ExecutorChannel一个可以使用多线程的channel

@Bean
    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 最大可创建的线程数
        int maxPoolSize = 200;
        executor.setMaxPoolSize(maxPoolSize);
        // 核心线程池大小
        int corePoolSize = 50;
        executor.setCorePoolSize(corePoolSize);
        // 队列最大长度
        int queueCapacity = 1000;
        executor.setQueueCapacity(queueCapacity);
        // 线程池维护线程所允许的空闲时间
        int keepAliveSeconds = 300;
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 线程池对拒绝任务(无线程可用)的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        // 用线程池
        return new ExecutorChannel(mqttThreadPoolTaskExecutor());
    }
登录后复制

到这里其实可以运行了.

但是这样配置其实还是有点多, 有点乱, 于是我查找官网, f发现一种更简单的配置方法 叫 Java DSL

我们参考官网,稍微改一下,使用 DSL的方式进行配置:

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * MqttConfigV2
 *
 * @author hengzi
 * @date 2022/8/24
 */
@Configuration
public class MqttConfigV2 {

    @Autowired
    private MqttProperties mqttProperties;

    @Autowired
    private MqttMessageHandle mqttMessageHandle;


    //Mqtt 客户端工厂 所有客户端从这里产生
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    // Mqtt 管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }

    // 消息生产者 (接收,处理来自mqtt的消息)
    @Bean
    public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {
        adapter.setCompletionTimeout(5000);
        adapter.setQos(1);
        return IntegrationFlows.from( adapter)
                .channel(new ExecutorChannel(mqttThreadPoolTaskExecutor()))
                .handle(mqttMessageHandle)
                .get();
    }

    @Bean
    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 最大可创建的线程数
        int maxPoolSize = 200;
        executor.setMaxPoolSize(maxPoolSize);
        // 核心线程池大小
        int corePoolSize = 50;
        executor.setCorePoolSize(corePoolSize);
        // 队列最大长度
        int queueCapacity = 1000;
        executor.setQueueCapacity(queueCapacity);
        // 线程池维护线程所允许的空闲时间
        int keepAliveSeconds = 300;
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 线程池对拒绝任务(无线程可用)的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

    // 出站处理器 (向 mqtt 发送消息)
    @Bean
    public IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {

        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return IntegrationFlows.from( "mqttOutboundChannel").handle(handler).get();
    }

}
登录后复制

这样看起来真的简单多了, 头也没那么大了, 我要是早知道多好.

好了以上就是配置相关的, 到这里其实是已经完成springboot 与 mqtt 的整合了.

但其实我一直有个想法, 就是我们接收的消息 都是在 handleMessage这个方法里面执行的,

	@Override
    public void handleMessage(Message<?> message) throws MessagingException {
     			
    }
登录后复制

所以我就有了一个想法, 能不能根据 我订阅的主题,在不同的方法执行, 对于这个问题,其实你用if ... else ...也能实现, 但很明显,如果我订阅的主题很多的话, 那写起来就很头痛了.

对于这个问题,有两种思路, 一个是添加Spring Integration的路由 router,根据不同topic路由到不同的channel, 这个我也知道能不能实现, 我这里就不讨论了.

第二种是, 我也不知道名字改如何叫, 我是参考了 spring@Controller的设计, 暂且叫他注解模式.

众所周知,我们的接口都是在类上加 @Controller这个注解, 就代表这个类是 http 接口, 再在方法加上 @RequestMapping就能实现不同的 url 调用不同的方法.

参数这个设计 我们在类上面加 @MqttService就代表这个类是专门处理mqtt消息的服务类
同时 在这个类的方法上 加上 @MqttTopic就代表 这个主题由这个方法处理.

OK, 理论有了,接下来就是 实践.

先定义 两个注解

import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;

import java.lang.annotation.*;

@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqttService {

    @AliasFor(
            annotation = Component.class
    )
    String value() default "";
}
登录后复制

加上 @Component注解 spring就会扫描, 并注册到IOC容器里

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttTopic {

    /**
     * 主题名字
     */
    String value() default "";

}
登录后复制

参考 @RequestMapping我们使用起来应该是这样的:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

/**
 * MqttTopicHandle
 *
 * @author hengzi
 * @date 2022/8/24
 */
@MqttService
public class MqttTopicHandle {

    public static final Logger log = LoggerFactory.getLogger(MqttTopicHandle.class);

	// 这里的 # 号是通配符
    @MqttTopic("test/#")
    public void test(Message<?> message){
        log.info("test="+message.getPayload());
    }
	
	// 这里的 + 号是通配符
    @MqttTopic("topic/+/+/up")
    public void up(Message<?> message){
        log.info("up="+message.getPayload());
    }

	// 注意 你必须先订阅
    @MqttTopic("topic/1/2/down")
    public void down(Message<?> message){
        log.info("down="+message.getPayload());
    }
}
登录后复制

OK 接下来就是实现这样的使用

分析 :

当我们收到消息时, 我们从IOC容器中 找到所有 带 @MqttService注解的类

然后 遍历这些类, 找到带有 @MqttTopic的方法

接着 把 @MqttTopic注解的的值 与 接受到的topic 进行对比

如果一致则执行这个方法

废话少说, 上代码

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;

/**
 * MessageHandleService
 *
 * @author hengzi
 * @date 2022/8/24
 */
@Component
public class MqttMessageHandle implements MessageHandler {

    public static final Logger log = LoggerFactory.getLogger(MqttMessageHandle.class);

    // 包含 @MqttService注解 的类(Component)
    public static Map<String, Object> mqttServices;


    /**
     * 所有mqtt到达的消息都会在这里处理
     * 要注意这个方法是在线程池里面运行的
     * @param message message
     */
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        getMqttTopicService(message);
    }

    public Map<String, Object> getMqttServices(){
        if(mqttServices==null){
            mqttServices = SpringUtils.getBeansByAnnotation(MqttService.class);
        }
        return mqttServices;
    }

    public void getMqttTopicService(Message<?> message){
        // 在这里 我们根据不同的 主题 分发不同的消息
        String receivedTopic = message.getHeaders().get("mqtt_receivedTopic",String.class);
        if(receivedTopic==null || "".equals(receivedTopic)){
            return;
        }
        for(Map.Entry<String, Object> entry : getMqttServices().entrySet()){
        	// 把所有带有 @MqttService 的类遍历
            Class<?> clazz = entry.getValue().getClass();
            // 获取他所有方法
            Method[] methods = clazz.getDeclaredMethods();
            for ( Method method: methods ){
                if (method.isAnnotationPresent(MqttTopic.class)){
                	// 如果这个方法有 这个注解
                    MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);
                    if(isMatch(receivedTopic,handleTopic.value())){
                    	// 并且 这个 topic 匹配成功
                        try {
                            method.invoke(SpringUtils.getBean(clazz),message);
                            return;
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                            log.error("代理炸了");
                        } catch (InvocationTargetException e) {
                            log.error("执行 {} 方法出现错误",handleTopic.value(),e);
                        }
                    }
                }
            }
        }
    }


    /**
     * mqtt 订阅的主题与我实际的主题是否匹配
     * @param topic 是实际的主题
     * @param pattern 是我订阅的主题 可以是通配符模式
     * @return 是否匹配
     */
    public static boolean isMatch(String topic, String pattern){

        if((topic==null) || (pattern==null) ){
            return false;
        }

        if(topic.equals(pattern)){
            // 完全相等是肯定匹配的
            return true;
        }

        if("#".equals(pattern)){
            // # 号代表所有主题  肯定匹配的
            return true;
        }
        String[] splitTopic = topic.split("/");
        String[] splitPattern = pattern.split("/");

        boolean match = true;

        // 如果包含 # 则只需要判断 # 前面的
        for (int i = 0; i < splitPattern.length; i++) {
            if(!"#".equals(splitPattern[i])){
                // 不是# 号 正常判断
                if(i>=splitTopic.length){
                    // 此时长度不相等 不匹配
                    match = false;
                    break;
                }
                if(!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])){
                    // 不相等 且不等于 +
                    match = false;
                    break;
                }
            }
            else {
                // 是# 号  肯定匹配的
                break;
            }
        }

        return match;
    }

}
登录后复制

工具类 SpringUtils

import org.springframework.aop.framework.AopContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * spring工具类 方便在非spring管理环境中获取bean
 * 
 */
@Component
public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware 
{
    /** Spring应用上下文环境 */
    private static ConfigurableListableBeanFactory beanFactory;

    private static ApplicationContext applicationContext;


    public static Map<String, Object> getBeansByAnnotation(Class clsName) throws BeansException{

        return beanFactory.getBeansWithAnnotation(clsName);
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException 
    {
        SpringUtils.beanFactory = beanFactory;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException 
    {
        SpringUtils.applicationContext = applicationContext;
    }

    /**
     * 获取对象
     *
     * @param name
     * @return Object 一个以所给名字注册的bean的实例
     * @throws org.springframework.beans.BeansException
     *
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException
    {
        return (T) beanFactory.getBean(name);
    }

    /**
     * 获取类型为requiredType的对象
     *
     * @param clz
     * @return
     * @throws org.springframework.beans.BeansException
     *
     */
    public static <T> T getBean(Class<T> clz) throws BeansException
    {
        T result = (T) beanFactory.getBean(clz);
        return result;
    }

    /**
     * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
     *
     * @param name
     * @return boolean
     */
    public static boolean containsBean(String name)
    {
        return beanFactory.containsBean(name);
    }

    /**
     * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.isSingleton(name);
    }

    /**
     * @param name
     * @return Class 注册对象的类型
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.getType(name);
    }

    /**
     * 如果给定的bean名字在bean定义中有别名,则返回这些别名
     *
     * @param name
     * @return
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.getAliases(name);
    }

    /**
     * 获取aop代理对象
     * 
     * @param invoker
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> T getAopProxy(T invoker)
    {
        return (T) AopContext.currentProxy();
    }

    /**
     * 获取当前的环境配置,无配置返回null
     *
     * @return 当前的环境配置
     */
    public static String[] getActiveProfiles()
    {
        return applicationContext.getEnvironment().getActiveProfiles();
    }

}
登录后复制

OK, 大功告成. 终于舒服了, 终于不用写if...else...了, 个人感觉这样处理起来会更加优雅. 写代码最重要是什么, 是优雅~

以上!

参考文章:

  • 使用 Spring integration 在Springboot中集成Mqtt

  • Spring Integration(一)概述

附:

动态添加主题方式:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.stereotype.Service;

import java.util.Arrays;

/**
 * MqttService
 *
 * @author hengzi
 * @date 2022/8/25
 */
@Service
public class MqttService {

    @Autowired
    private MqttPahoMessageDrivenChannelAdapter adapter;


    public void addTopic(String topic) {
        addTopic(topic, 1);
    }

    public void addTopic(String topic,int qos) {
        String[] topics = adapter.getTopic();
        if(!Arrays.asList(topics).contains(topic)){
            adapter.addTopic(topic,qos);
        }
    }

    public void removeTopic(String topic) {
        adapter.removeTopic(topic);
    }

}
登录后复制

直接调用就行

以上是springboot如何整合mqtt的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

MQTT在PHP开发中的容错与安全性考虑 MQTT在PHP开发中的容错与安全性考虑 Jul 08, 2023 am 11:34 AM

MQTT在PHP开发中的容错与安全性考虑概述:MQTT(MessageQueuingTelemetryTransport)是一种轻量级的通信协议,被广泛用于物联网和机器对机器(M2M)通信。在PHP开发中使用MQTT可以实现实时消息传递和远程控制等功能。本文将介绍在PHP开发中使用MQTT时需要考虑的容错和安全性问题,并提供一些代码示例供参考。一、容错

如何使用PHP和MQTT为网站添加实时用户聊天功能 如何使用PHP和MQTT为网站添加实时用户聊天功能 Jul 08, 2023 pm 07:46 PM

如何使用PHP和MQTT为网站添加实时用户聊天功能在当今互联网时代,网站用户越来越需要实时的交流和沟通,为了满足这种需求,我们可以使用PHP和MQTT来为网站添加实时用户聊天功能。本文将介绍如何使用PHP和MQTT实现网站实时用户聊天功能,并提供代码示例。确保环境准备在开始之前,确保你已经安装并配置了PHP和MQTT的运行环境。你可以使用XAMPP等集成开发

PHP MQTT客户端开发指南 PHP MQTT客户端开发指南 Mar 27, 2024 am 09:21 AM

MQTT(MessageQueuingTelemetryTransport)是一种轻量级的消息传输协议,通常用于物联网设备之间的通信。PHP是一种常用的服务器端编程语言,可以用来开发MQTT客户端。本文将介绍如何使用PHP开发MQTT客户端,并包含以下内容:MQTT协议的基本概念PHPMQTT客户端库的选取和使用实例:使用PHPMQTT客户端发布和

SpringBoot与SpringMVC的比较及差别分析 SpringBoot与SpringMVC的比较及差别分析 Dec 29, 2023 am 11:02 AM

SpringBoot和SpringMVC都是Java开发中常用的框架,但它们之间有一些明显的差异。本文将探究这两个框架的特点和用途,并对它们的差异进行比较。首先,我们来了解一下SpringBoot。SpringBoot是由Pivotal团队开发的,它旨在简化基于Spring框架的应用程序的创建和部署。它提供了一种快速、轻量级的方式来构建独立的、可执行

使用PHP和MQTT实现实时数据分析的最佳实践 使用PHP和MQTT实现实时数据分析的最佳实践 Jul 08, 2023 pm 05:57 PM

使用PHP和MQTT实现实时数据分析的最佳实践随着物联网和大数据技术的迅猛发展,实时数据分析在各行各业中变得越来越重要。在实时数据分析中,MQTT(MQTelemetryTransport)作为一种轻量级的通信协议,被广泛应用于物联网领域。结合PHP和MQTT,可以快速、高效地实现实时数据分析。本文将介绍使用PHP和MQTT实现实时数据分析的最佳实践,并

使用PHP和MQTT构建实时聊天应用 使用PHP和MQTT构建实时聊天应用 Jul 08, 2023 pm 03:18 PM

使用PHP和MQTT构建实时聊天应用引言:随着互联网的快速发展和智能设备的普及,实时通讯已经成为了现代社会中必不可少的功能之一。为了满足人们的沟通需求,开发一个实时聊天应用已经成为了众多开发者的追求目标。在本篇文章中,我们将介绍如何使用PHP和MQTT(MessageQueuingTelemetryTransport)协议来构建一个实时聊天应用。什么是

SpringBoot+Dubbo+Nacos 开发实战教程 SpringBoot+Dubbo+Nacos 开发实战教程 Aug 15, 2023 pm 04:49 PM

本文来写个详细的例子来说下dubbo+nacos+Spring Boot开发实战。本文不会讲述太多的理论的知识,会写一个最简单的例子来说明dubbo如何与nacos整合,快速搭建开发环境。

MQTT协议的PHP实现方案比较和选择指南 MQTT协议的PHP实现方案比较和选择指南 Jul 08, 2023 pm 10:43 PM

MQTT协议的PHP实现方案比较和选择指南摘要:MQTT(MessageQueuingTelemetryTransport)是一种轻量级的发布/订阅通信协议,适用于物联网等低带宽、高延迟的环境。本文将探讨MQTT协议在PHP中的实现方案,并提供比较和选择指南。引言:随着物联网的迅猛发展,越来越多的设备需要进行实时数据传输和通信。MQTT作为一种轻量级的

See all articles