Home Java javaTutorial How springboot integrates mqtt

How springboot integrates mqtt

May 15, 2023 pm 04:25 PM
springboot mqtt

springboot integrates mqtt

When setting up, if you are using a cluster, remember to open the following ports:

How springboot integrates mqtt

Okay, After successful construction, the next step is to connect our java program to mqtt. There are two ways (actually more than two) to connect.

One is to directly use the MQTT Java client library

2 Using spring integration mqtt is also a recommended one, and it is what we focus on.

The first step is to add maven dependency

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.5.14</version>
        </dependency>
Copy after login

The second step is to add configuration

1 First write some basic configuration

mqtt:
 username: test                        # 账号
 password: 123456                      # 密码
 host-url: tcp://127.0.0.1:1883        # mqtt连接tcp地址
 in-client-id: ${random.value}         # 随机值,使出入站 client ID 不同
 out-client-id: ${random.value}
 client-id: ${random.int}                   # 客户端Id,不能相同,采用随机数 ${random.value}
 default-topic: test/#,topic/+/+/up         # 默认主题
 timeout: 60                                # 超时时间
 keepalive: 60                              # 保持连接
 clearSession: true                         # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)
Copy after login

2. Then write a corresponding classMqttProperties

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * MqttProperties 
 *
 * @author hengzi
 * @date 2022/8/23
 */
@Component
public class MqttProperties {

    /**
     * 用户名
     */
    @Value("${mqtt.username}")
    private String username;

    /**
     * 密码
     */
    @Value("${mqtt.password}")
    private String password;

    /**
     * 连接地址
     */
    @Value("${mqtt.host-url}")
    private String hostUrl;

    /**
     * 进-客户Id
     */
    @Value("${mqtt.in-client-id}")
    private String inClientId;

    /**
     * 出-客户Id
     */
    @Value("${mqtt.out-client-id}")
    private String outClientId;

    /**
     * 客户Id
     */
    @Value("${mqtt.client-id}")
    private String clientId;

    /**
     * 默认连接话题
     */
    @Value("${mqtt.default-topic}")
    private String defaultTopic;

    /**
     * 超时时间
     */
    @Value("${mqtt.timeout}")
    private int timeout;

    /**
     * 保持连接数
     */
    @Value("${mqtt.keepalive}")
    private int keepalive;

    /**是否清除session*/
    @Value("${mqtt.clearSession}")
    private boolean clearSession;

	// ...getter and setter

}
Copy after login

The next step is to configure some messy things, here There are many conceptual things such as pipechannel, adapteradapter, inboundInbound, outboundOutbound, etc. It seems to be a very headache

Okay, let’s do it one by one.

First of all, connecting to mqtt requires a client, then we will open a client factory, which can produce many, many Client

    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }
Copy after login

Then create two more tubes (channel), one outbound and one inbound

    //出站消息管道,
    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }

    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        return new DirectChannel();
    }
Copy after login

In order to make these tubes flow, an adapter is needed (adapter)

    // Mqtt 管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }
Copy after login

Then define the message producer

    // 消息生产者
    @Bean
    public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //入站投递的通道
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setQos(1);
        return adapter;
    }
Copy after login

Then where do we process the message when we receive it? The answer is here:

    @Bean
    //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handleMessage() {
    	// 这个 mqttMessageHandle 其实就是一个 MessageHandler 的实现类(这个类我放下面)
        return mqttMessageHandle;
		// 你也可以这样写
//        return new MessageHandler() {
//            @Override
//            public void handleMessage(Message<?> message) throws MessagingException {
//                // do something
//            }
//        };
Copy after login

To Here we can actually receive messages from mqtt

Next, configure sending messages to mqtt

Configure the outbound processor

    // 出站处理器
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return handler;
    }
Copy after login

This outbound processor is in my It seems that I will let others (MqttPahoMessageHandler) handle it, and I will not handle it. I only care about what I want to send. As for how to send it, MqttPahoMessageHandler will complete it

Next we define an interface

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * MqttGateway
 *
 * @author hengzi
 * @date 2022/8/23
 */

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String data);
}
Copy after login

We can directly call this interface to send data to mqtt

So far, the entire configuration file looks like this:

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

/**
 * MqttConfig
 *
 * @author hengzi
 * @date 2022/8/23
 */
@Configuration
public class MqttConfig {


    /**
     *  以下属性将在配置文件中读取
     **/
    @Autowired
    private MqttProperties mqttProperties;


    //Mqtt 客户端工厂
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    // Mqtt 管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }


    // 消息生产者
    @Bean
    public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //入站投递的通道
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setQos(1);
        return adapter;
    }


    // 出站处理器
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return handler;
    }

    @Bean
    //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handleMessage() {
        return mqttMessageHandle;
    }

    //出站消息管道,
    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }


    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        return new DirectChannel();
    }
}
Copy after login

Processing The MqttMessageHandle

@Component
public class MqttMessageHandle implements MessageHandler {
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
     
    }
}
Copy after login

of the message. After further understanding, I found that there are areas that can be optimized. For example, there are many types of channels. The DirectChannel used here is Spring IntegrationThe default message channel sends the message to a subscriber, and then blocks the sending until the message is received. The transmission methods are all synchronous and run by a thread.

Here we can change the inbound channel to ExecutorChannelA multi-threaded channel

@Bean
    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 最大可创建的线程数
        int maxPoolSize = 200;
        executor.setMaxPoolSize(maxPoolSize);
        // 核心线程池大小
        int corePoolSize = 50;
        executor.setCorePoolSize(corePoolSize);
        // 队列最大长度
        int queueCapacity = 1000;
        executor.setQueueCapacity(queueCapacity);
        // 线程池维护线程所允许的空闲时间
        int keepAliveSeconds = 300;
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 线程池对拒绝任务(无线程可用)的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        // 用线程池
        return new ExecutorChannel(mqttThreadPoolTaskExecutor());
    }
Copy after login

can actually run here .

But this configuration is actually a bit too much and a bit messy, so I searched the official website and found a simpler configuration method called Java DSL

Let’s refer to it Official website, slightly modified, using DSL to configure:

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * MqttConfigV2
 *
 * @author hengzi
 * @date 2022/8/24
 */
@Configuration
public class MqttConfigV2 {

    @Autowired
    private MqttProperties mqttProperties;

    @Autowired
    private MqttMessageHandle mqttMessageHandle;


    //Mqtt 客户端工厂 所有客户端从这里产生
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    // Mqtt 管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }

    // 消息生产者 (接收,处理来自mqtt的消息)
    @Bean
    public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {
        adapter.setCompletionTimeout(5000);
        adapter.setQos(1);
        return IntegrationFlows.from( adapter)
                .channel(new ExecutorChannel(mqttThreadPoolTaskExecutor()))
                .handle(mqttMessageHandle)
                .get();
    }

    @Bean
    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 最大可创建的线程数
        int maxPoolSize = 200;
        executor.setMaxPoolSize(maxPoolSize);
        // 核心线程池大小
        int corePoolSize = 50;
        executor.setCorePoolSize(corePoolSize);
        // 队列最大长度
        int queueCapacity = 1000;
        executor.setQueueCapacity(queueCapacity);
        // 线程池维护线程所允许的空闲时间
        int keepAliveSeconds = 300;
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 线程池对拒绝任务(无线程可用)的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

    // 出站处理器 (向 mqtt 发送消息)
    @Bean
    public IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {

        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return IntegrationFlows.from( "mqttOutboundChannel").handle(handler).get();
    }

}
Copy after login

This looks really much simpler, and the head is not so big. If only I had known earlier.

Good The above is related to configuration. At this point, the integration of springboot and mqtt has actually been completed.

But in fact, I have always had an idea, that is, the messages we receive are all in handleMessage It is executed in the method,

	@Override
    public void handleMessage(Message<?> message) throws MessagingException {
     			
    }
Copy after login

So I have an idea, can it be executed in different methods according to the topic I subscribe to? For this problem, you actually use if ... else. .. can also be implemented, but obviously, if I subscribe to a lot of topics, it will be a headache to write.

There are two ideas for this problem, one is to add Spring Integration's routingrouter routes to different channel according to different topics. I also know whether this can be achieved, so I won't discuss it here.

The second is, I don’t know how to change the name. I refer to the design of @Controller of spring, and let’s call it annotation mode.

As we all know, our interfaces all add the @Controller annotation to the class, which means that the class is an http interface, and then add @RequestMapping to the method to implement different url calls. Different methods.

Parameter design, we add @MqttService to the class, which means that this class is a service class that specializes in processing mqtt messages
At the same time, add to the method of this class @MqttTopic means that this topic is processed by this method.

OK, the theory is there, the next step is practice.

First define two annotations

import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;

import java.lang.annotation.*;

@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqttService {

    @AliasFor(
            annotation = Component.class
    )
    String value() default "";
}
Copy after login

Add@ComponentAnnotate spring and it will scan and register it in the IOC container

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttTopic {

    /**
     * 主题名字
     */
    String value() default "";

}
Copy after login

Reference@RequestMappingWe should use it like this:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

/**
 * MqttTopicHandle
 *
 * @author hengzi
 * @date 2022/8/24
 */
@MqttService
public class MqttTopicHandle {

    public static final Logger log = LoggerFactory.getLogger(MqttTopicHandle.class);

	// 这里的 # 号是通配符
    @MqttTopic("test/#")
    public void test(Message<?> message){
        log.info("test="+message.getPayload());
    }
	
	// 这里的 + 号是通配符
    @MqttTopic("topic/+/+/up")
    public void up(Message<?> message){
        log.info("up="+message.getPayload());
    }

	// 注意 你必须先订阅
    @MqttTopic("topic/1/2/down")
    public void down(Message<?> message){
        log.info("down="+message.getPayload());
    }
}
Copy after login

OK The next step is to implement such usage

analysis:

When we receive the message, we find all the messages with @MqttService from the IOC container The annotated classes

then traverse these classes, find the method with @MqttTopic

, and then combine the value of the @MqttTopic annotation with the received Compare the topics

If they are consistent, execute this method

Stop talking nonsense, just go to the code

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;

/**
 * MessageHandleService
 *
 * @author hengzi
 * @date 2022/8/24
 */
@Component
public class MqttMessageHandle implements MessageHandler {

    public static final Logger log = LoggerFactory.getLogger(MqttMessageHandle.class);

    // 包含 @MqttService注解 的类(Component)
    public static Map<String, Object> mqttServices;


    /**
     * 所有mqtt到达的消息都会在这里处理
     * 要注意这个方法是在线程池里面运行的
     * @param message message
     */
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        getMqttTopicService(message);
    }

    public Map<String, Object> getMqttServices(){
        if(mqttServices==null){
            mqttServices = SpringUtils.getBeansByAnnotation(MqttService.class);
        }
        return mqttServices;
    }

    public void getMqttTopicService(Message<?> message){
        // 在这里 我们根据不同的 主题 分发不同的消息
        String receivedTopic = message.getHeaders().get("mqtt_receivedTopic",String.class);
        if(receivedTopic==null || "".equals(receivedTopic)){
            return;
        }
        for(Map.Entry<String, Object> entry : getMqttServices().entrySet()){
        	// 把所有带有 @MqttService 的类遍历
            Class<?> clazz = entry.getValue().getClass();
            // 获取他所有方法
            Method[] methods = clazz.getDeclaredMethods();
            for ( Method method: methods ){
                if (method.isAnnotationPresent(MqttTopic.class)){
                	// 如果这个方法有 这个注解
                    MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);
                    if(isMatch(receivedTopic,handleTopic.value())){
                    	// 并且 这个 topic 匹配成功
                        try {
                            method.invoke(SpringUtils.getBean(clazz),message);
                            return;
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                            log.error("代理炸了");
                        } catch (InvocationTargetException e) {
                            log.error("执行 {} 方法出现错误",handleTopic.value(),e);
                        }
                    }
                }
            }
        }
    }


    /**
     * mqtt 订阅的主题与我实际的主题是否匹配
     * @param topic 是实际的主题
     * @param pattern 是我订阅的主题 可以是通配符模式
     * @return 是否匹配
     */
    public static boolean isMatch(String topic, String pattern){

        if((topic==null) || (pattern==null) ){
            return false;
        }

        if(topic.equals(pattern)){
            // 完全相等是肯定匹配的
            return true;
        }

        if("#".equals(pattern)){
            // # 号代表所有主题  肯定匹配的
            return true;
        }
        String[] splitTopic = topic.split("/");
        String[] splitPattern = pattern.split("/");

        boolean match = true;

        // 如果包含 # 则只需要判断 # 前面的
        for (int i = 0; i < splitPattern.length; i++) {
            if(!"#".equals(splitPattern[i])){
                // 不是# 号 正常判断
                if(i>=splitTopic.length){
                    // 此时长度不相等 不匹配
                    match = false;
                    break;
                }
                if(!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])){
                    // 不相等 且不等于 +
                    match = false;
                    break;
                }
            }
            else {
                // 是# 号  肯定匹配的
                break;
            }
        }

        return match;
    }

}
Copy after login

Tool classSpringUtils

import org.springframework.aop.framework.AopContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * spring工具类 方便在非spring管理环境中获取bean
 * 
 */
@Component
public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware 
{
    /** Spring应用上下文环境 */
    private static ConfigurableListableBeanFactory beanFactory;

    private static ApplicationContext applicationContext;


    public static Map<String, Object> getBeansByAnnotation(Class clsName) throws BeansException{

        return beanFactory.getBeansWithAnnotation(clsName);
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException 
    {
        SpringUtils.beanFactory = beanFactory;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException 
    {
        SpringUtils.applicationContext = applicationContext;
    }

    /**
     * 获取对象
     *
     * @param name
     * @return Object 一个以所给名字注册的bean的实例
     * @throws org.springframework.beans.BeansException
     *
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException
    {
        return (T) beanFactory.getBean(name);
    }

    /**
     * 获取类型为requiredType的对象
     *
     * @param clz
     * @return
     * @throws org.springframework.beans.BeansException
     *
     */
    public static <T> T getBean(Class<T> clz) throws BeansException
    {
        T result = (T) beanFactory.getBean(clz);
        return result;
    }

    /**
     * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
     *
     * @param name
     * @return boolean
     */
    public static boolean containsBean(String name)
    {
        return beanFactory.containsBean(name);
    }

    /**
     * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.isSingleton(name);
    }

    /**
     * @param name
     * @return Class 注册对象的类型
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.getType(name);
    }

    /**
     * 如果给定的bean名字在bean定义中有别名,则返回这些别名
     *
     * @param name
     * @return
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.getAliases(name);
    }

    /**
     * 获取aop代理对象
     * 
     * @param invoker
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> T getAopProxy(T invoker)
    {
        return (T) AopContext.currentProxy();
    }

    /**
     * 获取当前的环境配置,无配置返回null
     *
     * @return 当前的环境配置
     */
    public static String[] getActiveProfiles()
    {
        return applicationContext.getEnvironment().getActiveProfiles();
    }

}
Copy after login

OK, 大功告成. 终于舒服了, 终于不用写if...else...了, 个人感觉这样处理起来会更加优雅. 写代码最重要是什么, 是优雅~

以上!

参考文章:

  • 使用 Spring integration 在Springboot中集成Mqtt

  • Spring Integration(一)概述

附:

动态添加主题方式:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.stereotype.Service;

import java.util.Arrays;

/**
 * MqttService
 *
 * @author hengzi
 * @date 2022/8/25
 */
@Service
public class MqttService {

    @Autowired
    private MqttPahoMessageDrivenChannelAdapter adapter;


    public void addTopic(String topic) {
        addTopic(topic, 1);
    }

    public void addTopic(String topic,int qos) {
        String[] topics = adapter.getTopic();
        if(!Arrays.asList(topics).contains(topic)){
            adapter.addTopic(topic,qos);
        }
    }

    public void removeTopic(String topic) {
        adapter.removeTopic(topic);
    }

}
Copy after login

直接调用就行

The above is the detailed content of How springboot integrates mqtt. For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Article

R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Best Graphic Settings
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. How to Fix Audio if You Can't Hear Anyone
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

Fault tolerance and security considerations of MQTT in PHP development Fault tolerance and security considerations of MQTT in PHP development Jul 08, 2023 am 11:34 AM

Overview of fault tolerance and security considerations of MQTT in PHP development: MQTT (MessageQueuingTelemetryTransport) is a lightweight communication protocol that is widely used in the Internet of Things and machine-to-machine (M2M) communication. Using MQTT in PHP development can achieve functions such as real-time messaging and remote control. This article will introduce fault tolerance and security issues that need to be considered when using MQTT in PHP development, and provide some code examples for reference. 1. Fault tolerance

How to add real-time user chat functionality to your website using PHP and MQTT How to add real-time user chat functionality to your website using PHP and MQTT Jul 08, 2023 pm 07:46 PM

How to use PHP and MQTT to add real-time user chat function to the website. In today's Internet era, website users increasingly need real-time communication and communication. In order to meet this demand, we can use PHP and MQTT to add real-time user chat function to the website. This article will introduce how to use PHP and MQTT to implement the real-time user chat function of the website and provide code examples. Make sure the environment is ready Before starting, make sure you have installed and configured the PHP and MQTT runtime environments. You can use integrated development such as XAMPP

Comparison and difference analysis between SpringBoot and SpringMVC Comparison and difference analysis between SpringBoot and SpringMVC Dec 29, 2023 am 11:02 AM

SpringBoot and SpringMVC are both commonly used frameworks in Java development, but there are some obvious differences between them. This article will explore the features and uses of these two frameworks and compare their differences. First, let's learn about SpringBoot. SpringBoot was developed by the Pivotal team to simplify the creation and deployment of applications based on the Spring framework. It provides a fast, lightweight way to build stand-alone, executable

PHP MQTT Client Development Guide PHP MQTT Client Development Guide Mar 27, 2024 am 09:21 AM

MQTT (MessageQueuingTelemetryTransport) is a lightweight message transmission protocol commonly used for communication between IoT devices. PHP is a commonly used server-side programming language that can be used to develop MQTT clients. This article will introduce how to use PHP to develop an MQTT client and include the following content: Basic concepts of the MQTT protocol Selection and usage examples of the PHPMQTT client library: Using the PHPMQTT client to publish and

Build a real-time chat application using PHP and MQTT Build a real-time chat application using PHP and MQTT Jul 08, 2023 pm 03:18 PM

Building a real-time chat application using PHP and MQTT Introduction: With the rapid development of the Internet and the popularity of smart devices, real-time communication has become one of the essential functions in modern society. In order to meet people's communication needs, developing a real-time chat application has become the goal pursued by many developers. In this article, we will introduce how to use PHP and MQTT (MessageQueuingTelemetryTransport) protocol to build a real-time chat application. what is

SpringBoot+Dubbo+Nacos development practical tutorial SpringBoot+Dubbo+Nacos development practical tutorial Aug 15, 2023 pm 04:49 PM

This article will write a detailed example to talk about the actual development of dubbo+nacos+Spring Boot. This article will not cover too much theoretical knowledge, but will write the simplest example to illustrate how dubbo can be integrated with nacos to quickly build a development environment.

Best practices for real-time data analysis using PHP and MQTT Best practices for real-time data analysis using PHP and MQTT Jul 08, 2023 pm 05:57 PM

Best practices for real-time data analysis using PHP and MQTT With the rapid development of IoT and big data technology, real-time data analysis is becoming more and more important in various industries. In real-time data analysis, MQTT (MQTelemetryTransport), as a lightweight communication protocol, is widely used in the field of Internet of Things. Combining PHP and MQTT, real-time data analysis can be achieved quickly and efficiently. This article will introduce best practices for real-time data analysis using PHP and MQTT, and

PHP implementation solution comparison and selection guide for MQTT protocol PHP implementation solution comparison and selection guide for MQTT protocol Jul 08, 2023 pm 10:43 PM

PHP implementation comparison and selection guide for the MQTT protocol Summary: MQTT (MessageQueuingTelemetryTransport) is a lightweight publish/subscribe communication protocol suitable for low-bandwidth, high-latency environments such as the Internet of Things. This article will explore the implementation of the MQTT protocol in PHP and provide a comparison and selection guide. Introduction: With the rapid development of the Internet of Things, more and more devices require real-time data transmission and communication. MQTT as a lightweight

See all articles