Inhaltsverzeichnis
1 Nachrichten abrufen
1.1 Pull-Anforderungen kapseln
1.2 Verarbeitung von Pull-Anfragen
2 Verarbeitung von Verbrauchsergebnissen
2.1 Gleichzeitige Nachrichten
2.2 Sequentielle Nachrichten
3 Zusammenfassung
Heim Technologie-Peripheriegeräte KI Alibabas zweite Seite: Der RocketMQ-Verbraucher ruft eine Reihe von Nachrichten ab, aber einige davon können nicht konsumiert werden. Wie aktualisiere ich den Offset?

Alibabas zweite Seite: Der RocketMQ-Verbraucher ruft eine Reihe von Nachrichten ab, aber einige davon können nicht konsumiert werden. Wie aktualisiere ich den Offset?

Apr 12, 2023 pm 11:28 PM
rocketmq 消费者 consumer

Hallo zusammen, ich bin Bruder Jun.

Einem Leser wurde kürzlich während eines Interviews eine Frage gestellt. Wenn ein Verbraucher einen Stapel von Nachrichten abruft, z. B. 100, und die 100. Nachricht erfolgreich konsumiert wird, die 50. Nachricht jedoch fehlschlägt, wie wird der Offset aktualisiert? Lassen Sie uns heute in Bezug auf dieses Problem darüber sprechen, wie der Offset gespeichert werden kann, wenn ein Nachrichtenstapel nicht verarbeitet werden kann.

1 Nachrichten abrufen

1.1 Pull-Anforderungen kapseln

Am Beispiel des RocketMQ-Push-Modus lautet der Startcode des RocketMQ-Verbrauchers wie folgt:

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
 }catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}
Nach dem Login kopieren

Der obige DefaultMQPushConsumer ist ein Push-Modus-Verbraucher und die Startmethode ist start. Nach dem Start des Verbrauchers wird der Neuausgleichsthread (RebalanceService) ausgelöst. Die Aufgabe dieses Threads besteht darin, den Ausgleich kontinuierlich in einer Endlosschleife durchzuführen und schließlich die Anforderung zum Abrufen der Nachricht in die pullRequestQueue zu kapseln. Das an diesem Prozess beteiligte UML-Klassendiagramm lautet wie folgt:

Alibabas zweite Seite: Der RocketMQ-Verbraucher ruft eine Reihe von Nachrichten ab, aber einige davon können nicht konsumiert werden. Wie aktualisiere ich den Offset?

1.2 Verarbeitung von Pull-Anfragen

Nach der Kapselung der Pull-Anfrage PullRequest erhält RocketMQ kontinuierlich die Nachrichten-Pull-Anfrage von pullRequestQueue zur Verarbeitung. Das UML-Klassendiagramm lautet wie folgt:

Alibabas zweite Seite: Der RocketMQ-Verbraucher ruft eine Reihe von Nachrichten ab, aber einige davon können nicht konsumiert werden. Wie aktualisiere ich den Offset?

Die Eingabemethode zum Abrufen von Nachrichten ist eine Endlosschleife und der Code lautet wie folgt:

//PullMessageService
public void run(){
 log.info(this.getServiceName() + " service started");

 while (!this.isStopped()) {
try {
 PullRequest pullRequest = this.pullRequestQueue.take();
 this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
 log.error("Pull Message Service Run Method exception", e);
}
 }

 log.info(this.getServiceName() + " service end");
}
Nach dem Login kopieren

Nachdem die Nachricht hierher gezogen wurde, wird sie an die PullCallback-Rückruffunktion für gesendet Verarbeitung.

Die abgerufene Nachricht wird zunächst in msgTreeMap in ProcessQueue abgelegt und dann zur Verarbeitung in die Thread-Klasse ConsumeRequest gekapselt. Nach der Straffung des Codes lautet die ConsumeRequest-Verarbeitungslogik wie folgt:

//ConsumeMessageConcurrentlyService.java
public void run(){
 MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
 ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
 ConsumeConcurrentlyStatus status = null;
 try {
//1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
 } catch (Throwable e) {
 }
 if (!processQueue.isDropped()) {
//2.处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
 } else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
 }
}
Nach dem Login kopieren

2 Verarbeitung von Verbrauchsergebnissen

2.1 Gleichzeitige Nachrichten

Der Code für die gleichzeitige Verarbeitung von Verbrauchsergebnissen wird wie folgt vereinfacht:

//ConsumeMessageConcurrentlyService.java
public void processConsumeResult(
 final ConsumeConcurrentlyStatus status,
 final ConsumeConcurrentlyContext context,
 final ConsumeRequest consumeRequest
){
 int ackIndex = context.getAckIndex();
 switch (status) {
case CONSUME_SUCCESS:
 if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
 }
 int ok = ackIndex + 1;
 int failed = consumeRequest.getMsgs().size() - ok;
 break;
case RECONSUME_LATER:
 break;
default:
 break;
 }

 switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
 }
 break;
case CLUSTERING:
 List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
 msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
 msgBackFailed.add(msg);
}
 }

 if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
 }
 break;
default:
 break;
 }

 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
 if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
 }
}
Nach dem Login kopieren

Wie aus dem obigen Code ersichtlich ist Wenn die Nachricht verarbeitet wird, verwendet der Code am Anfang des Artikels beispielsweise eine for-Schleife, um Nachrichten zu verarbeiten. Wenn die Verarbeitung einer bestimmten Nachricht fehlschlägt, verlassen Sie die Schleife direkt und weisen Sie die Variable ackIndex von ConsumeConcurrentlyContext zu an die Position der fehlgeschlagenen Nachricht in der Nachrichtenliste, sodass die auf die erste fehlgeschlagene Nachricht folgenden Nachrichten nicht mehr verarbeitet werden und an den Broker gesendet werden, um auf einen erneuten Abruf zu warten. Der Code lautet wie folgt:

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 for (int i = 0; i < msgs.size(); i++) {
try{
 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
 context.setAckIndex(i);
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}
Nach dem Login kopieren

Die erfolgreich konsumierte Nachricht wird aus der msgTreeMap in der ProcessQueue entfernt und der kleinste Offset (firstKey) in der msgTreeMap wird zur Aktualisierung zurückgegeben. Hinweis: Der Cluster-Modus-Offset wird auf der Broker-Seite gespeichert. Um den Offset zu aktualisieren, muss eine Nachricht an den Broker gesendet werden. Der Broadcast-Modus-Offset wird jedoch auf der Consumer-Seite gespeichert und nur der lokale Offset muss aktualisiert werden.

Wenn die Logik der Nachrichtenverarbeitung parallel ist, ist es sinnlos, ackIndex einen Wert zuzuweisen, nachdem die Nachrichtenverarbeitung fehlgeschlagen ist, da mehrere Nachrichten fehlschlagen können und die Zuweisung eines Werts zur ackIndex-Variablen nicht korrekt ist. Der beste Weg besteht darin, ackIndex den Wert 0 zuzuweisen und den gesamten Nachrichtenstapel erneut zu verbrauchen, was zu anderen Problemen führen kann.

2.2 Sequentielle Nachrichten

Bei sequentiellen Nachrichten muss die Nachricht, nachdem sie von msgTreeMap übernommen wurde, zunächst auf sumptionMsgOrderlyTreeMap platziert werden. Beim Aktualisieren des Offsets wird der maximale Nachrichtenoffset (lastKey) von sumptionMsgOrderlyTreeMap übernommen.

3 Zusammenfassung

Zurück zur ursprünglichen Frage: Wenn ein Nachrichtenstapel der Reihe nach konsumiert wird, ist es unmöglich, dass die 100. Nachricht erfolgreich konsumiert wird, die 50. Nachricht jedoch fehlschlägt, denn wenn die 50. Nachricht fehlschlägt, Die Schleife sollte beendet und der Konsum nicht mehr fortgesetzt werden.

Wenn diese Situation auftritt, wird empfohlen, den gesamten Nachrichtenstapel erneut zu konsumieren, dh ackIndex den Wert 0 zuzuweisen. Auf diese Weise müssen Probleme wie Unterwelt berücksichtigt werden.

Das obige ist der detaillierte Inhalt vonAlibabas zweite Seite: Der RocketMQ-Verbraucher ruft eine Reihe von Nachrichten ab, aber einige davon können nicht konsumiert werden. Wie aktualisiere ich den Offset?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn

Heiße KI -Werkzeuge

Undresser.AI Undress

Undresser.AI Undress

KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover

AI Clothes Remover

Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Undress AI Tool

Undress AI Tool

Ausziehbilder kostenlos

Clothoff.io

Clothoff.io

KI-Kleiderentferner

AI Hentai Generator

AI Hentai Generator

Erstellen Sie kostenlos Ai Hentai.

Heißer Artikel

R.E.P.O. Energiekristalle erklärten und was sie tun (gelber Kristall)
3 Wochen vor By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Beste grafische Einstellungen
3 Wochen vor By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. So reparieren Sie Audio, wenn Sie niemanden hören können
3 Wochen vor By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25: Wie man alles in Myrise freischaltet
4 Wochen vor By 尊渡假赌尊渡假赌尊渡假赌

Heiße Werkzeuge

Notepad++7.3.1

Notepad++7.3.1

Einfach zu bedienender und kostenloser Code-Editor

SublimeText3 chinesische Version

SublimeText3 chinesische Version

Chinesische Version, sehr einfach zu bedienen

Senden Sie Studio 13.0.1

Senden Sie Studio 13.0.1

Leistungsstarke integrierte PHP-Entwicklungsumgebung

Dreamweaver CS6

Dreamweaver CS6

Visuelle Webentwicklungstools

SublimeText3 Mac-Version

SublimeText3 Mac-Version

Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

Wie integriert SpringBoot RocketMQ? Wie integriert SpringBoot RocketMQ? May 14, 2023 am 10:19 AM

1. SpringBoot integriert RocketMQ. Die Integration von RocketMQ in SpringBoot erfordert nur vier einfache Schritte: 1. Führen Sie die relevante Abhängigkeit org.apache.rocketmqrocketmq-spring-boot-starter2 einmal ein Der maximale Wert der Nachricht. Beachten Sie, dass es sich um den maximalen Wert der abgerufenen Nachricht handelt, nicht um den maximal verbrauchten Wert

Alibabas zweite Seite: Der RocketMQ-Verbraucher ruft eine Reihe von Nachrichten ab, aber einige davon können nicht konsumiert werden. Wie aktualisiere ich den Offset? Alibabas zweite Seite: Der RocketMQ-Verbraucher ruft eine Reihe von Nachrichten ab, aber einige davon können nicht konsumiert werden. Wie aktualisiere ich den Offset? Apr 12, 2023 pm 11:28 PM

Hallo zusammen, ich bin Bruder Jun. Kürzlich wurde einem Leser während eines Interviews eine Frage gestellt: Wenn ein Verbraucher einen Stapel von Nachrichten abruft, z. B. 100, und die 100. Nachricht erfolgreich konsumiert wird, die 50. Nachricht jedoch fehlschlägt, wie wird der Offset aktualisiert? Lassen Sie uns heute in Bezug auf dieses Problem darüber sprechen, wie der Offset gespeichert werden kann, wenn ein Nachrichtenstapel nicht verarbeitet werden kann. 1 Nachrichten abrufen 1.1 Pull-Anforderungen kapseln Am Beispiel des RocketMQ-Push-Modus lautet der Startcode des RocketMQ-Verbrauchers wie folgt: public static void main(String[] args) throws InterruptedException, MQClie

So lösen Sie die Fallstricke, die bei der Integration von RocketMQ mit SpringBoot auftreten So lösen Sie die Fallstricke, die bei der Integration von RocketMQ mit SpringBoot auftreten May 19, 2023 am 11:25 AM

Anwendungsszenarien Bei der Implementierung des RocketMQ-Verbrauchs wird im Allgemeinen die Annotation @RocketMQMessageListener verwendet, um Gruppe, Thema und Selektorausdruck (Datenfilter- und Auswahlregeln) zu definieren. Um die dynamische Filterung von Daten zu unterstützen, werden im Allgemeinen Ausdrücke verwendet und anschließend eine dynamische Umschaltung durchgeführt über Apollo oder Cloudconfig. Einführung der Abhängigkeit org.apache.rocketmqrocketmq-spring-boot-starter2.0.4 Verbrauchercode @RocketMQMessageListener(consumerGroup=&qu

Gemeinsame Umgebungsbereitstellung – Docker-Installation RocketMQ-Tutorial! Gemeinsame Umgebungsbereitstellung – Docker-Installation RocketMQ-Tutorial! Mar 07, 2024 am 09:30 AM

Der Prozess der Installation von RocketMQ in Docker ist wie folgt: Erstellen Sie ein Docker-Netzwerk: Führen Sie den folgenden Befehl im Terminal aus, um ein Docker-Netzwerk für die Kommunikation zwischen Containern zu erstellen: dockernetworkcreaterocketmq-network Laden Sie das RocketMQ-Image herunter: Führen Sie zum Herunterladen den folgenden Befehl im Terminal aus RocketMQ Docker-Image: dockerpullrocketmqinc/rocketmq Starten Sie den NameServer-Container: Führen Sie den folgenden Befehl im Terminal aus, um den NameServer-Container zu starten: dockerrun -d --namermqnamesrv --

So implementieren Sie Warteschlangenproduzenten- und -konsumentenmuster in PHP und MySQL So implementieren Sie Warteschlangenproduzenten- und -konsumentenmuster in PHP und MySQL Oct 15, 2023 pm 02:33 PM

Implementierungsmethoden von Warteschlangenproduzenten- und -konsumentenmustern in PHP und MySQL Mit der rasanten Entwicklung des Internetgeschäfts ist die Notwendigkeit, eine große Anzahl von Aufgaben im System zu bewältigen, immer dringlicher geworden. Warteschlangen sind eine gängige Lösung, um Aufgaben effizient zu erledigen. Die Implementierung des Producer-Consumer-Musters der Warteschlange (Producer-ConsumerPattern) in PHP und MySQL ist eine gängige Lösung. In diesem Artikel werden die spezifische Implementierungsmethode vorgestellt und Codebeispiele bereitgestellt. Produzenten-Konsumenten-Modell

Producer-Consumer-Problem und seine Implementierung in C++ Producer-Consumer-Problem und seine Implementierung in C++ Sep 17, 2023 pm 11:09 PM

Eine vorherrschende Synchronisationsherausforderung beim Concurrent Computing ist das sogenannte Producer-Consumer-Problem. Da mehrere Threads oder Prozesse darauf ausgelegt sind, ihre Vorgänge beim Zugriff auf eine gemeinsam genutzte Quelle zu koordinieren, erfordert dieses Problem komplexe Kommunikationsaufgaben sowie eine ausgewogene Ausführung. Die heutige Diskussion wird dazu beitragen, die Konzepte hinter dieser Schwierigkeit zu verstehen und gleichzeitig ihre Bedeutung in zeitgenössischen Informatik-Frameworks anzuerkennen – insbesondere in der C++-Implementierungspraxis. Definition und Zweck des Produzenten-Konsumenten-Problems verstehen Lösungen für die Herausforderungen des Produzenten-Konsumenten-Problems ergeben sich aus einer klaren Abgrenzung der Verantwortlichkeiten zwischen denjenigen, die für die Produktion und Nutzung von Informationen verantwortlich sind. Wenn Produzenten selbst neue Datensätze generieren, stellen Konsumenten sicher, dass diese korrekt verwendet werden, indem sie ihre Abläufe synchronisieren. Man muss darauf achten, Probleme wie Race Conditions oder Deadlocks zu vermeiden, z.B.

Analyse des Hochverfügbarkeitsbeispiels eines RocketMQ-Produzenten für die Java-Entwicklung Analyse des Hochverfügbarkeitsbeispiels eines RocketMQ-Produzenten für die Java-Entwicklung Apr 23, 2023 pm 11:28 PM

1 Nachricht publicclassMessageimplementsSerializable{privatestaticfinallongserialVersionUID=8445773977080406428L;//Themenname privateStringtopic;//Informationen zur Nachrichtenerweiterung, Tag, Schlüssel, Verzögerungsstufe sind alle hier vorhanden privateMappproperties;//Nachrichtentext, Byte-Array privatebyte[]body;//Legen Sie den Schlüssel fest message , publicvoidsetKeys(Stringkeys){}//Set

Welche Rolle spielt die Java-Consumer-Schnittstelle? Welche Rolle spielt die Java-Consumer-Schnittstelle? Apr 26, 2023 am 11:34 AM

1. Erklären Sie, dass der Verbraucher den Verbrauch darstellt, den allgemeinen Parameter t akzeptiert, Accept aufruft und eine Reihe von Operationen für die Parameter ausführt, es jedoch keinen Rückgabewert gibt. 2. Instanz Für den Verbraucher müssen wir Eingabeparameter für den Verbrauch bereitstellen. classPerson{StringfirstName;StringlastName;Person(){}Person(StringfirstName,StringlastName){this.firstName=firstName;this.lastName=lastName;}}Wofür kann Java hauptsächlich verwendet werden in: 1.web

See all articles