Wir verfügen über einige Spring-Integrationsprozesse, um Nachrichten zu verarbeiten, die über mqtt oder stomp eingehen. Hierfür nutzen wir Adapter mqttpahomessagedrivenchanneladapter
和 stompinboundchanneladapter
.
Im Fall von mqtt haben wir beobachtet, dass der Adapter die Verbindung schließt und keine Nachrichten mehr empfängt, wenn ein Endpunkt im Stream eine Ausnahme auslöst. Wenn wir den Proxy neu starten, wird die Verbindung zu ihm ebenfalls nicht wieder hergestellt.
Um Ausnahmen zu behandeln, setzen wir den Namen des Fehlerkanals auf den Wert, der standardmäßig vom Spring-Adapter „errorchannel“ verarbeitet wird. Unsere Absicht ist es, nur die Ausnahme zu protokollieren, ohne die zugrunde liegende Verbindung zu schließen. Ist dies die richtige Art, Ausnahmen während des gesamten Prozesses zu behandeln?
Was das Problem der erneuten Verbindung betrifft, haben wir für jedes Transportprotokoll unterschiedliche Methoden.
connectionoptions
的 automaticreconnect
设置为 true
: var clientfactory = new defaultmqttpahoclientfactory(); clientfactory.getconnectionoptions().setautomaticreconnect(true); var adapter = new mqttpahomessagedrivenchanneladapter("tcp://localhost:1883", mqttasyncclient.generateclientid(), clientfactory, "/topic/mytopic"); adapter.seterrorchannelname("errorchannel");
taskscheduler
设置为 reactornettytcpstompclient
in den Kontext: var stompClient = new ReactorNettyTcpStompClient(host, port); stompClient.setTaskScheduler(taskScheduler); var stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient); var adapter = new StompInboundChannelAdapter(stompSessionManager, "/queue/myQueue"); adapter.setErrorChannelName("errorChannel");
Ist dies der beste Weg, dieses Problem zu lösen?
Ja, errorchannel
选项是抑制向 mqtt 客户端抛出异常的好方法。不必是全局 errorchannel
,它可能在许多不同的地方使用。 setautomaticreconnect(true)
wird tatsächlich für Inbound-Channel-Adapter empfohlen.
Nicht verwendet in reactornettytcpstompclient
的 taskscheduler
不适用于重新连接。请参阅其 javadocs。我认为重新连接逻辑在 reactornettytcpstompclient
:
public completablefuture<stompsession> connectasync(@nullable stompheaders connectheaders, stompsessionhandler handler) { connectionhandlingstompsession session = createsession(connectheaders, handler); this.tcpclient.connectasync(session); return session.getsession(); }
Fall der Wiederverbindung über eine andere Variante:
CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);
Das obige ist der detaillierte Inhalt vonBehandeln Sie zugrunde liegende MQTT- und STOMP-Verbindungen mit dem Spring Integration-Adapter. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!