Bei der Arbeit stoßen wir häufig auf Geschäftsszenarien, die eine asynchrone Nachrichtenverarbeitung erfordern. Je nach Art der Nachricht gibt es völlig unterschiedliche Verarbeitungsmethoden.
1. Nachrichten sind nicht unabhängig
Unabhängige Nachrichten haben normalerweise sequentielle Abhängigkeiten. Zu diesem Zeitpunkt degeneriert der Nachrichtenverarbeitungsmechanismus in einen linearen Warteschlangenverarbeitungsmodus und kann nur verbraucht werden von einem Verbraucher. Oder gehen Sie zu einem einzelnen Thread, um Nachrichten zu verarbeiten.
2. Nachrichten sind völlig unabhängig
Völlig unabhängige Nachrichten können gleichzeitig von mehreren Verbrauchern (Threads) verarbeitet werden, wodurch maximale gleichzeitige Verarbeitungsfunktionen erreicht werden.
3. Nachrichten sind nicht völlig unabhängig
Normalerweise ist dies der Fall, wenn Nachrichten von derselben Quelle (vom selben Produzenten) bestellt werden müssen und die Reihenfolge von Nachrichten aus verschiedenen Quellen sind irrelevant.
Die Nachrichtenverarbeitung ist in diesem Szenario relativ kompliziert. Um die Reihenfolge der Nachrichten aus derselben Quelle sicherzustellen, ist es einfach, feste Verbraucherthreads an Nachrichten aus derselben Quelle zu binden ist sehr einfach, hat aber viele Fragen.
Wenn die Anzahl der Produzenten groß ist, reicht die Anzahl der gebundenen Threads möglicherweise nicht aus. Natürlich können Thread-Ressourcen wiederverwendet werden und derselbe Thread kann zur Verarbeitung an mehrere Nachrichtenquellen gebunden werden verursachen ein weiteres Problem: Nachrichteninteraktionen zwischen Quellen.
Stellen Sie sich das folgende Szenario vor:
Produzent P1 generiert eine große Anzahl von Nachrichten, tritt in die Warteschlange ein und wird dem Verbraucherthread C1 zur Verarbeitung zugewiesen (die Verarbeitung von C1 kann lange dauern Zu diesem Zeitpunkt hat der Produzent P2 eine Nachricht generiert, diese wurde jedoch leider auch dem Verbraucher-Thread C1 zur Verarbeitung zugewiesen
Dann wird die Nachrichtenverarbeitung des Produzenten P2 durch die große Anzahl von blockiert Nachrichten von P1, was zu einer Lücke zwischen P1 und P2 führt. Die gegenseitige Beeinflussung und andere Verbrauchsthreads können nicht vollständig genutzt werden, was zu einem Ungleichgewicht führt.
Wir müssen also darüber nachdenken, solche Probleme zu vermeiden. Erreichen Sie die Aktualität der Verbrauchsverarbeitung (so schnell wie möglich), die Isolation (Vermeidung gegenseitiger Beeinträchtigung) und das Gleichgewicht (Maximierung der gleichzeitigen Verarbeitung). Thread-Versandmodell (PUSH-Methode), die spezifische Methode ist normalerweise wie folgt:
1 Es gibt einen globalen Nachrichten-Dispatcher, der die Warteschlange abfragt, um Nachrichten abzurufen.
2. Senden Sie die Nachricht je nach Quelle zur Verarbeitung an den entsprechenden Verbraucherthread.
Der Mechanismus des Verteilungsalgorithmus kann so einfach sein wie ein Hash basierend auf der Nachrichtenquelle oder so komplex wie die aktuelle Auslastung jedes Verbraucherthreads, die Länge der Warteschlange und die Komplexität der Nachricht. und können auf Basis einer umfassenden Analyse für den Vertrieb ausgewählt werden.
Simple Hash wird definitiv auf die im obigen Szenario beschriebenen Probleme stoßen, aber komplexe Verteilungsberechnungen sind offensichtlich sehr mühsam und kompliziert zu implementieren, und die Effizienz ist nicht unbedingt gut. Es ist auch schwierig, eine perfekte zu erreichen Gleichgewicht im Sinne des Gleichgewichts.
Der zweite Modus verwendet die PULL-Methode und der Thread zieht bei Bedarf die folgende:
1 Die Nachrichtenquelle fügt die generierte Nachricht direkt ein temporär entsprechend der Quelle. Warteschlange (wie unten gezeigt, stellt jede Sitzung eine andere Nachrichtenquelle dar) und stellt die Sitzung dann in eine Blockierungswarteschlange, um den Thread zur Verarbeitung zu benachrichtigen
2 gleichzeitig die Warteschlange, um um Nachrichten zu konkurrieren (Stellen Sie sicher, dass nur ein Thread sie erhält
3. Überprüfen Sie, ob der Warteschlangenindikator von anderen Threads verarbeitet wird (die Implementierung erfordert eine Erkennungssynchronisierung basierend auf Nachrichten gleichen Ursprungs). auf Thread-Ebene)
4. Wenn nicht von anderen Threads verarbeitet, geben Sie den Status in der Synchronisierungsbereichseinstellungsverarbeitung an und verarbeiten Sie die Nachrichten in der temporären Warteschlange nach dem Verlassen des Synchronisierungsbereichs
# 🎜🎜#5. Nachdem die Verarbeitung abgeschlossen ist, betreten Sie schließlich erneut den Synchronisierungsbereich. Setzen Sie den Verarbeitungsanzeigestatus auf LeerlaufDas Folgende ist ein Code zur Beschreibung des Verbrauchsthread-Verarbeitungsprozesses: #🎜🎜 #public void run() { try { for (AbstractSession s = squeue.take(); s != null; s = squeue.take()) { // first check any worker is processing this session? // if any other worker thread is processing this event with same session, just ignore it. synchronized (s) { if (!s.isEventProcessing()) { s.setEventProcessing(true); } else { continue; } } // fire events with same session fire(s); // last reset processing flag and quit current thread processing s.setEventProcessing(false); // if remaining events, so re-insert to session queue if (s.getEventQueue().size() > 0 && !s.isEventProcessing()) { squeue.offer(s); } } } catch (InterruptedException e) { LOG.warn(e.getMessage(), e); } }
Das obige ist der detaillierte Inhalt vonSpringboot-Methode zur asynchronen Nachrichtenverarbeitung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!