Home > Java > body text

Handle underlying MQTT and STOMP connections using the Spring Integration adapter

WBOY
Release: 2024-02-05 22:00:10
forward
1226 people have browsed it
Question content

We have some spring integration processes to handle messages arriving via mqtt or stomp. For this we use adapters mqttpahomessagedrivenchanneladapter and stompinboundchanneladapter. In the case of mqtt, we observed that if any endpoint in the stream throws an exception, the adapter closes the connection and no longer receives messages. Likewise, if we restart the proxy, the connection to it will not be established again.

In order to handle exceptions, we set the error channel name to the value handled by spring by default "errorchannel" adapter. Our intention is to only log the exception without closing the underlying connection. Is this the correct way to handle exceptions throughout the process?

Regarding the reconnection issue, we have different methods for each transport protocol.

  • For mqtt, we set automaticreconnect of connectionoptions to true:
var clientfactory = new defaultmqttpahoclientfactory();
clientfactory.getconnectionoptions().setautomaticreconnect(true);

var adapter = new mqttpahomessagedrivenchanneladapter("tcp://localhost:1883", mqttasyncclient.generateclientid(), clientfactory, "/topic/mytopic");
adapter.seterrorchannelname("errorchannel");
Copy after login
  • For stomp, we set the taskscheduler in the context to reactornettytcpstompclient:
var stompClient = new ReactorNettyTcpStompClient(host, port);
stompClient.setTaskScheduler(taskScheduler);

var stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient);

var adapter = new StompInboundChannelAdapter(stompSessionManager, "/queue/myQueue");
adapter.setErrorChannelName("errorChannel");
Copy after login

Is this the best way to handle this problem?


Correct answer


Yes, the errorchannel option is a good way to suppress exceptions being thrown to mqtt clients. Doesn't have to be global errorchannel, it may be used in many different places. setautomaticreconnect(true) Really recommended for inbound channel adapters.

reactornettytcpstompclient's taskscheduler does not work with reconnections. See its javadocs. I think the reconnection logic is not used in reactornettytcpstompclient:

public completablefuture<stompsession> connectasync(@nullable stompheaders connectheaders, stompsessionhandler handler) {
    connectionhandlingstompsession session = createsession(connectheaders, handler);
    this.tcpclient.connectasync(session);
    return session.getsession();
}
Copy after login

Case of reconnection via another variant:

CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);
Copy after login

The above is the detailed content of Handle underlying MQTT and STOMP connections using the Spring Integration adapter. For more information, please follow other related articles on the PHP Chinese website!

source:stackoverflow.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template