Heim > Java > Behandeln Sie zugrunde liegende MQTT- und STOMP-Verbindungen mit dem Spring Integration-Adapter

Behandeln Sie zugrunde liegende MQTT- und STOMP-Verbindungen mit dem Spring Integration-Adapter

WBOY
Freigeben: 2024-02-05 22:00:10
nach vorne
1283 Leute haben es durchsucht
Frageninhalt

Wir verfügen über einige Spring-Integrationsprozesse, um Nachrichten zu verarbeiten, die über mqtt oder stomp eingehen. Hierfür nutzen wir Adapter mqttpahomessagedrivenchanneladapterstompinboundchanneladapter. Im Fall von mqtt haben wir beobachtet, dass der Adapter die Verbindung schließt und keine Nachrichten mehr empfängt, wenn ein Endpunkt im Stream eine Ausnahme auslöst. Wenn wir den Proxy neu starten, wird die Verbindung zu ihm ebenfalls nicht wieder hergestellt.

Um Ausnahmen zu behandeln, setzen wir den Namen des Fehlerkanals auf den Wert, der standardmäßig vom Spring-Adapter „errorchannel“ verarbeitet wird. Unsere Absicht ist es, nur die Ausnahme zu protokollieren, ohne die zugrunde liegende Verbindung zu schließen. Ist dies die richtige Art, Ausnahmen während des gesamten Prozesses zu behandeln?

Was das Problem der erneuten Verbindung betrifft, haben wir für jedes Transportprotokoll unterschiedliche Methoden.

  • Für mqtt werden wir connectionoptionsautomaticreconnect 设置为 true:
var clientfactory = new defaultmqttpahoclientfactory();
clientfactory.getconnectionoptions().setautomaticreconnect(true);

var adapter = new mqttpahomessagedrivenchanneladapter("tcp://localhost:1883", mqttasyncclient.generateclientid(), clientfactory, "/topic/mytopic");
adapter.seterrorchannelname("errorchannel");
Nach dem Login kopieren
  • Für stampfen setzen wir taskscheduler 设置为 reactornettytcpstompclient in den Kontext:
var stompClient = new ReactorNettyTcpStompClient(host, port);
stompClient.setTaskScheduler(taskScheduler);

var stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient);

var adapter = new StompInboundChannelAdapter(stompSessionManager, "/queue/myQueue");
adapter.setErrorChannelName("errorChannel");
Nach dem Login kopieren

Ist dies der beste Weg, dieses Problem zu lösen?


Richtige Antwort


Ja, errorchannel 选项是抑制向 mqtt 客户端抛出异常的好方法。不必是全局 errorchannel ,它可能在许多不同的地方使用。 setautomaticreconnect(true) wird tatsächlich für Inbound-Channel-Adapter empfohlen.

Nicht verwendet in reactornettytcpstompclienttaskscheduler 不适用于重新连接。请参阅其 javadocs。我认为重新连接逻辑在 reactornettytcpstompclient:

public completablefuture<stompsession> connectasync(@nullable stompheaders connectheaders, stompsessionhandler handler) {
    connectionhandlingstompsession session = createsession(connectheaders, handler);
    this.tcpclient.connectasync(session);
    return session.getsession();
}
Nach dem Login kopieren

Fall der Wiederverbindung über eine andere Variante:

CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);
Nach dem Login kopieren

Das obige ist der detaillierte Inhalt vonBehandeln Sie zugrunde liegende MQTT- und STOMP-Verbindungen mit dem Spring Integration-Adapter. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:stackoverflow.com
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage