Heim > Technologie-Peripheriegeräte > KI > Alibabas zweite Seite: Der RocketMQ-Verbraucher ruft eine Reihe von Nachrichten ab, aber einige davon können nicht konsumiert werden. Wie aktualisiere ich den Offset?

Alibabas zweite Seite: Der RocketMQ-Verbraucher ruft eine Reihe von Nachrichten ab, aber einige davon können nicht konsumiert werden. Wie aktualisiere ich den Offset?

WBOY
Freigeben: 2023-04-12 23:28:14
nach vorne
1033 Leute haben es durchsucht

Hallo zusammen, ich bin Bruder Jun.

Einem Leser wurde kürzlich während eines Interviews eine Frage gestellt. Wenn ein Verbraucher einen Stapel von Nachrichten abruft, z. B. 100, und die 100. Nachricht erfolgreich konsumiert wird, die 50. Nachricht jedoch fehlschlägt, wie wird der Offset aktualisiert? Lassen Sie uns heute in Bezug auf dieses Problem darüber sprechen, wie der Offset gespeichert werden kann, wenn ein Nachrichtenstapel nicht verarbeitet werden kann.

1 Nachrichten abrufen

1.1 Pull-Anforderungen kapseln

Am Beispiel des RocketMQ-Push-Modus lautet der Startcode des RocketMQ-Verbrauchers wie folgt:

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
 }catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}
Nach dem Login kopieren

Der obige DefaultMQPushConsumer ist ein Push-Modus-Verbraucher und die Startmethode ist start. Nach dem Start des Verbrauchers wird der Neuausgleichsthread (RebalanceService) ausgelöst. Die Aufgabe dieses Threads besteht darin, den Ausgleich kontinuierlich in einer Endlosschleife durchzuführen und schließlich die Anforderung zum Abrufen der Nachricht in die pullRequestQueue zu kapseln. Das an diesem Prozess beteiligte UML-Klassendiagramm lautet wie folgt:

Alibabas zweite Seite: Der RocketMQ-Verbraucher ruft eine Reihe von Nachrichten ab, aber einige davon können nicht konsumiert werden. Wie aktualisiere ich den Offset?

1.2 Verarbeitung von Pull-Anfragen

Nach der Kapselung der Pull-Anfrage PullRequest erhält RocketMQ kontinuierlich die Nachrichten-Pull-Anfrage von pullRequestQueue zur Verarbeitung. Das UML-Klassendiagramm lautet wie folgt:

Alibabas zweite Seite: Der RocketMQ-Verbraucher ruft eine Reihe von Nachrichten ab, aber einige davon können nicht konsumiert werden. Wie aktualisiere ich den Offset?

Die Eingabemethode zum Abrufen von Nachrichten ist eine Endlosschleife und der Code lautet wie folgt:

//PullMessageService
public void run(){
 log.info(this.getServiceName() + " service started");

 while (!this.isStopped()) {
try {
 PullRequest pullRequest = this.pullRequestQueue.take();
 this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
 log.error("Pull Message Service Run Method exception", e);
}
 }

 log.info(this.getServiceName() + " service end");
}
Nach dem Login kopieren

Nachdem die Nachricht hierher gezogen wurde, wird sie an die PullCallback-Rückruffunktion für gesendet Verarbeitung.

Die abgerufene Nachricht wird zunächst in msgTreeMap in ProcessQueue abgelegt und dann zur Verarbeitung in die Thread-Klasse ConsumeRequest gekapselt. Nach der Straffung des Codes lautet die ConsumeRequest-Verarbeitungslogik wie folgt:

//ConsumeMessageConcurrentlyService.java
public void run(){
 MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
 ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
 ConsumeConcurrentlyStatus status = null;
 try {
//1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
 } catch (Throwable e) {
 }
 if (!processQueue.isDropped()) {
//2.处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
 } else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
 }
}
Nach dem Login kopieren

2 Verarbeitung von Verbrauchsergebnissen

2.1 Gleichzeitige Nachrichten

Der Code für die gleichzeitige Verarbeitung von Verbrauchsergebnissen wird wie folgt vereinfacht:

//ConsumeMessageConcurrentlyService.java
public void processConsumeResult(
 final ConsumeConcurrentlyStatus status,
 final ConsumeConcurrentlyContext context,
 final ConsumeRequest consumeRequest
){
 int ackIndex = context.getAckIndex();
 switch (status) {
case CONSUME_SUCCESS:
 if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
 }
 int ok = ackIndex + 1;
 int failed = consumeRequest.getMsgs().size() - ok;
 break;
case RECONSUME_LATER:
 break;
default:
 break;
 }

 switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
 }
 break;
case CLUSTERING:
 List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
 msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
 msgBackFailed.add(msg);
}
 }

 if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
 }
 break;
default:
 break;
 }

 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
 if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
 }
}
Nach dem Login kopieren

Wie aus dem obigen Code ersichtlich ist Wenn die Nachricht verarbeitet wird, verwendet der Code am Anfang des Artikels beispielsweise eine for-Schleife, um Nachrichten zu verarbeiten. Wenn die Verarbeitung einer bestimmten Nachricht fehlschlägt, verlassen Sie die Schleife direkt und weisen Sie die Variable ackIndex von ConsumeConcurrentlyContext zu an die Position der fehlgeschlagenen Nachricht in der Nachrichtenliste, sodass die auf die erste fehlgeschlagene Nachricht folgenden Nachrichten nicht mehr verarbeitet werden und an den Broker gesendet werden, um auf einen erneuten Abruf zu warten. Der Code lautet wie folgt:

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 for (int i = 0; i < msgs.size(); i++) {
try{
 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
 context.setAckIndex(i);
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}
Nach dem Login kopieren

Die erfolgreich konsumierte Nachricht wird aus der msgTreeMap in der ProcessQueue entfernt und der kleinste Offset (firstKey) in der msgTreeMap wird zur Aktualisierung zurückgegeben. Hinweis: Der Cluster-Modus-Offset wird auf der Broker-Seite gespeichert. Um den Offset zu aktualisieren, muss eine Nachricht an den Broker gesendet werden. Der Broadcast-Modus-Offset wird jedoch auf der Consumer-Seite gespeichert und nur der lokale Offset muss aktualisiert werden.

Wenn die Logik der Nachrichtenverarbeitung parallel ist, ist es sinnlos, ackIndex einen Wert zuzuweisen, nachdem die Nachrichtenverarbeitung fehlgeschlagen ist, da mehrere Nachrichten fehlschlagen können und die Zuweisung eines Werts zur ackIndex-Variablen nicht korrekt ist. Der beste Weg besteht darin, ackIndex den Wert 0 zuzuweisen und den gesamten Nachrichtenstapel erneut zu verbrauchen, was zu anderen Problemen führen kann.

2.2 Sequentielle Nachrichten

Bei sequentiellen Nachrichten muss die Nachricht, nachdem sie von msgTreeMap übernommen wurde, zunächst auf sumptionMsgOrderlyTreeMap platziert werden. Beim Aktualisieren des Offsets wird der maximale Nachrichtenoffset (lastKey) von sumptionMsgOrderlyTreeMap übernommen.

3 Zusammenfassung

Zurück zur ursprünglichen Frage: Wenn ein Nachrichtenstapel der Reihe nach konsumiert wird, ist es unmöglich, dass die 100. Nachricht erfolgreich konsumiert wird, die 50. Nachricht jedoch fehlschlägt, denn wenn die 50. Nachricht fehlschlägt, Die Schleife sollte beendet und der Konsum nicht mehr fortgesetzt werden.

Wenn diese Situation auftritt, wird empfohlen, den gesamten Nachrichtenstapel erneut zu konsumieren, dh ackIndex den Wert 0 zuzuweisen. Auf diese Weise müssen Probleme wie Unterwelt berücksichtigt werden.

Das obige ist der detaillierte Inhalt vonAlibabas zweite Seite: Der RocketMQ-Verbraucher ruft eine Reihe von Nachrichten ab, aber einige davon können nicht konsumiert werden. Wie aktualisiere ich den Offset?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:51cto.com
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage