


Alibabas zweite Seite: Der RocketMQ-Verbraucher ruft eine Reihe von Nachrichten ab, aber einige davon können nicht konsumiert werden. Wie aktualisiere ich den Offset?
Hallo zusammen, ich bin Bruder Jun.
Einem Leser wurde kürzlich während eines Interviews eine Frage gestellt. Wenn ein Verbraucher einen Stapel von Nachrichten abruft, z. B. 100, und die 100. Nachricht erfolgreich konsumiert wird, die 50. Nachricht jedoch fehlschlägt, wie wird der Offset aktualisiert? Lassen Sie uns heute in Bezug auf dieses Problem darüber sprechen, wie der Offset gespeichert werden kann, wenn ein Nachrichtenstapel nicht verarbeitet werden kann.
1 Nachrichten abrufen
1.1 Pull-Anforderungen kapseln
Am Beispiel des RocketMQ-Push-Modus lautet der Startcode des RocketMQ-Verbrauchers wie folgt:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
Der obige DefaultMQPushConsumer ist ein Push-Modus-Verbraucher und die Startmethode ist start. Nach dem Start des Verbrauchers wird der Neuausgleichsthread (RebalanceService) ausgelöst. Die Aufgabe dieses Threads besteht darin, den Ausgleich kontinuierlich in einer Endlosschleife durchzuführen und schließlich die Anforderung zum Abrufen der Nachricht in die pullRequestQueue zu kapseln. Das an diesem Prozess beteiligte UML-Klassendiagramm lautet wie folgt:
1.2 Verarbeitung von Pull-Anfragen
Nach der Kapselung der Pull-Anfrage PullRequest erhält RocketMQ kontinuierlich die Nachrichten-Pull-Anfrage von pullRequestQueue zur Verarbeitung. Das UML-Klassendiagramm lautet wie folgt:
Die Eingabemethode zum Abrufen von Nachrichten ist eine Endlosschleife und der Code lautet wie folgt:
//PullMessageService public void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
Nachdem die Nachricht hierher gezogen wurde, wird sie an die PullCallback-Rückruffunktion für gesendet Verarbeitung.
Die abgerufene Nachricht wird zunächst in msgTreeMap in ProcessQueue abgelegt und dann zur Verarbeitung in die Thread-Klasse ConsumeRequest gekapselt. Nach der Straffung des Codes lautet die ConsumeRequest-Verarbeitungslogik wie folgt:
//ConsumeMessageConcurrentlyService.java public void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) { //2.处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
2 Verarbeitung von Verbrauchsergebnissen
2.1 Gleichzeitige Nachrichten
Der Code für die gleichzeitige Verarbeitung von Verbrauchsergebnissen wird wie folgt vereinfacht:
//ConsumeMessageConcurrentlyService.java public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ){ int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; break; case RECONSUME_LATER: break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
Wie aus dem obigen Code ersichtlich ist Wenn die Nachricht verarbeitet wird, verwendet der Code am Anfang des Artikels beispielsweise eine for-Schleife, um Nachrichten zu verarbeiten. Wenn die Verarbeitung einer bestimmten Nachricht fehlschlägt, verlassen Sie die Schleife direkt und weisen Sie die Variable ackIndex von ConsumeConcurrentlyContext zu an die Position der fehlgeschlagenen Nachricht in der Nachrichtenliste, sodass die auf die erste fehlgeschlagene Nachricht folgenden Nachrichten nicht mehr verarbeitet werden und an den Broker gesendet werden, um auf einen erneuten Abruf zu warten. Der Code lautet wie folgt:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ for (int i = 0; i < msgs.size(); i++) { try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ context.setAckIndex(i); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
Die erfolgreich konsumierte Nachricht wird aus der msgTreeMap in der ProcessQueue entfernt und der kleinste Offset (firstKey) in der msgTreeMap wird zur Aktualisierung zurückgegeben. Hinweis: Der Cluster-Modus-Offset wird auf der Broker-Seite gespeichert. Um den Offset zu aktualisieren, muss eine Nachricht an den Broker gesendet werden. Der Broadcast-Modus-Offset wird jedoch auf der Consumer-Seite gespeichert und nur der lokale Offset muss aktualisiert werden.
Wenn die Logik der Nachrichtenverarbeitung parallel ist, ist es sinnlos, ackIndex einen Wert zuzuweisen, nachdem die Nachrichtenverarbeitung fehlgeschlagen ist, da mehrere Nachrichten fehlschlagen können und die Zuweisung eines Werts zur ackIndex-Variablen nicht korrekt ist. Der beste Weg besteht darin, ackIndex den Wert 0 zuzuweisen und den gesamten Nachrichtenstapel erneut zu verbrauchen, was zu anderen Problemen führen kann.
2.2 Sequentielle Nachrichten
Bei sequentiellen Nachrichten muss die Nachricht, nachdem sie von msgTreeMap übernommen wurde, zunächst auf sumptionMsgOrderlyTreeMap platziert werden. Beim Aktualisieren des Offsets wird der maximale Nachrichtenoffset (lastKey) von sumptionMsgOrderlyTreeMap übernommen.
3 Zusammenfassung
Zurück zur ursprünglichen Frage: Wenn ein Nachrichtenstapel der Reihe nach konsumiert wird, ist es unmöglich, dass die 100. Nachricht erfolgreich konsumiert wird, die 50. Nachricht jedoch fehlschlägt, denn wenn die 50. Nachricht fehlschlägt, Die Schleife sollte beendet und der Konsum nicht mehr fortgesetzt werden.
Wenn diese Situation auftritt, wird empfohlen, den gesamten Nachrichtenstapel erneut zu konsumieren, dh ackIndex den Wert 0 zuzuweisen. Auf diese Weise müssen Probleme wie Unterwelt berücksichtigt werden.
Das obige ist der detaillierte Inhalt vonAlibabas zweite Seite: Der RocketMQ-Verbraucher ruft eine Reihe von Nachrichten ab, aber einige davon können nicht konsumiert werden. Wie aktualisiere ich den Offset?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Heiße KI -Werkzeuge

Undresser.AI Undress
KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover
Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Undress AI Tool
Ausziehbilder kostenlos

Clothoff.io
KI-Kleiderentferner

AI Hentai Generator
Erstellen Sie kostenlos Ai Hentai.

Heißer Artikel

Heiße Werkzeuge

Notepad++7.3.1
Einfach zu bedienender und kostenloser Code-Editor

SublimeText3 chinesische Version
Chinesische Version, sehr einfach zu bedienen

Senden Sie Studio 13.0.1
Leistungsstarke integrierte PHP-Entwicklungsumgebung

Dreamweaver CS6
Visuelle Webentwicklungstools

SublimeText3 Mac-Version
Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

Heiße Themen



1. SpringBoot integriert RocketMQ. Die Integration von RocketMQ in SpringBoot erfordert nur vier einfache Schritte: 1. Führen Sie die relevante Abhängigkeit org.apache.rocketmqrocketmq-spring-boot-starter2 einmal ein Der maximale Wert der Nachricht. Beachten Sie, dass es sich um den maximalen Wert der abgerufenen Nachricht handelt, nicht um den maximal verbrauchten Wert

Hallo zusammen, ich bin Bruder Jun. Kürzlich wurde einem Leser während eines Interviews eine Frage gestellt: Wenn ein Verbraucher einen Stapel von Nachrichten abruft, z. B. 100, und die 100. Nachricht erfolgreich konsumiert wird, die 50. Nachricht jedoch fehlschlägt, wie wird der Offset aktualisiert? Lassen Sie uns heute in Bezug auf dieses Problem darüber sprechen, wie der Offset gespeichert werden kann, wenn ein Nachrichtenstapel nicht verarbeitet werden kann. 1 Nachrichten abrufen 1.1 Pull-Anforderungen kapseln Am Beispiel des RocketMQ-Push-Modus lautet der Startcode des RocketMQ-Verbrauchers wie folgt: public static void main(String[] args) throws InterruptedException, MQClie

Anwendungsszenarien Bei der Implementierung des RocketMQ-Verbrauchs wird im Allgemeinen die Annotation @RocketMQMessageListener verwendet, um Gruppe, Thema und Selektorausdruck (Datenfilter- und Auswahlregeln) zu definieren. Um die dynamische Filterung von Daten zu unterstützen, werden im Allgemeinen Ausdrücke verwendet und anschließend eine dynamische Umschaltung durchgeführt über Apollo oder Cloudconfig. Einführung der Abhängigkeit org.apache.rocketmqrocketmq-spring-boot-starter2.0.4 Verbrauchercode @RocketMQMessageListener(consumerGroup=&qu

Der Prozess der Installation von RocketMQ in Docker ist wie folgt: Erstellen Sie ein Docker-Netzwerk: Führen Sie den folgenden Befehl im Terminal aus, um ein Docker-Netzwerk für die Kommunikation zwischen Containern zu erstellen: dockernetworkcreaterocketmq-network Laden Sie das RocketMQ-Image herunter: Führen Sie zum Herunterladen den folgenden Befehl im Terminal aus RocketMQ Docker-Image: dockerpullrocketmqinc/rocketmq Starten Sie den NameServer-Container: Führen Sie den folgenden Befehl im Terminal aus, um den NameServer-Container zu starten: dockerrun -d --namermqnamesrv --

Implementierungsmethoden von Warteschlangenproduzenten- und -konsumentenmustern in PHP und MySQL Mit der rasanten Entwicklung des Internetgeschäfts ist die Notwendigkeit, eine große Anzahl von Aufgaben im System zu bewältigen, immer dringlicher geworden. Warteschlangen sind eine gängige Lösung, um Aufgaben effizient zu erledigen. Die Implementierung des Producer-Consumer-Musters der Warteschlange (Producer-ConsumerPattern) in PHP und MySQL ist eine gängige Lösung. In diesem Artikel werden die spezifische Implementierungsmethode vorgestellt und Codebeispiele bereitgestellt. Produzenten-Konsumenten-Modell

Eine vorherrschende Synchronisationsherausforderung beim Concurrent Computing ist das sogenannte Producer-Consumer-Problem. Da mehrere Threads oder Prozesse darauf ausgelegt sind, ihre Vorgänge beim Zugriff auf eine gemeinsam genutzte Quelle zu koordinieren, erfordert dieses Problem komplexe Kommunikationsaufgaben sowie eine ausgewogene Ausführung. Die heutige Diskussion wird dazu beitragen, die Konzepte hinter dieser Schwierigkeit zu verstehen und gleichzeitig ihre Bedeutung in zeitgenössischen Informatik-Frameworks anzuerkennen – insbesondere in der C++-Implementierungspraxis. Definition und Zweck des Produzenten-Konsumenten-Problems verstehen Lösungen für die Herausforderungen des Produzenten-Konsumenten-Problems ergeben sich aus einer klaren Abgrenzung der Verantwortlichkeiten zwischen denjenigen, die für die Produktion und Nutzung von Informationen verantwortlich sind. Wenn Produzenten selbst neue Datensätze generieren, stellen Konsumenten sicher, dass diese korrekt verwendet werden, indem sie ihre Abläufe synchronisieren. Man muss darauf achten, Probleme wie Race Conditions oder Deadlocks zu vermeiden, z.B.

1 Nachricht publicclassMessageimplementsSerializable{privatestaticfinallongserialVersionUID=8445773977080406428L;//Themenname privateStringtopic;//Informationen zur Nachrichtenerweiterung, Tag, Schlüssel, Verzögerungsstufe sind alle hier vorhanden privateMappproperties;//Nachrichtentext, Byte-Array privatebyte[]body;//Legen Sie den Schlüssel fest message , publicvoidsetKeys(Stringkeys){}//Set

1. Erklären Sie, dass der Verbraucher den Verbrauch darstellt, den allgemeinen Parameter t akzeptiert, Accept aufruft und eine Reihe von Operationen für die Parameter ausführt, es jedoch keinen Rückgabewert gibt. 2. Instanz Für den Verbraucher müssen wir Eingabeparameter für den Verbrauch bereitstellen. classPerson{StringfirstName;StringlastName;Person(){}Person(StringfirstName,StringlastName){this.firstName=firstName;this.lastName=lastName;}}Wofür kann Java hauptsächlich verwendet werden in: 1.web
