Hallo zusammen, ich bin Bruder Jun.
Einem Leser wurde kürzlich während eines Interviews eine Frage gestellt. Wenn ein Verbraucher einen Stapel von Nachrichten abruft, z. B. 100, und die 100. Nachricht erfolgreich konsumiert wird, die 50. Nachricht jedoch fehlschlägt, wie wird der Offset aktualisiert? Lassen Sie uns heute in Bezug auf dieses Problem darüber sprechen, wie der Offset gespeichert werden kann, wenn ein Nachrichtenstapel nicht verarbeitet werden kann.
Am Beispiel des RocketMQ-Push-Modus lautet der Startcode des RocketMQ-Verbrauchers wie folgt:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
Der obige DefaultMQPushConsumer ist ein Push-Modus-Verbraucher und die Startmethode ist start. Nach dem Start des Verbrauchers wird der Neuausgleichsthread (RebalanceService) ausgelöst. Die Aufgabe dieses Threads besteht darin, den Ausgleich kontinuierlich in einer Endlosschleife durchzuführen und schließlich die Anforderung zum Abrufen der Nachricht in die pullRequestQueue zu kapseln. Das an diesem Prozess beteiligte UML-Klassendiagramm lautet wie folgt:
Nach der Kapselung der Pull-Anfrage PullRequest erhält RocketMQ kontinuierlich die Nachrichten-Pull-Anfrage von pullRequestQueue zur Verarbeitung. Das UML-Klassendiagramm lautet wie folgt:
Die Eingabemethode zum Abrufen von Nachrichten ist eine Endlosschleife und der Code lautet wie folgt:
//PullMessageService public void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
Nachdem die Nachricht hierher gezogen wurde, wird sie an die PullCallback-Rückruffunktion für gesendet Verarbeitung.
Die abgerufene Nachricht wird zunächst in msgTreeMap in ProcessQueue abgelegt und dann zur Verarbeitung in die Thread-Klasse ConsumeRequest gekapselt. Nach der Straffung des Codes lautet die ConsumeRequest-Verarbeitungslogik wie folgt:
//ConsumeMessageConcurrentlyService.java public void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) { //2.处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
Der Code für die gleichzeitige Verarbeitung von Verbrauchsergebnissen wird wie folgt vereinfacht:
//ConsumeMessageConcurrentlyService.java public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ){ int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; break; case RECONSUME_LATER: break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
Wie aus dem obigen Code ersichtlich ist Wenn die Nachricht verarbeitet wird, verwendet der Code am Anfang des Artikels beispielsweise eine for-Schleife, um Nachrichten zu verarbeiten. Wenn die Verarbeitung einer bestimmten Nachricht fehlschlägt, verlassen Sie die Schleife direkt und weisen Sie die Variable ackIndex von ConsumeConcurrentlyContext zu an die Position der fehlgeschlagenen Nachricht in der Nachrichtenliste, sodass die auf die erste fehlgeschlagene Nachricht folgenden Nachrichten nicht mehr verarbeitet werden und an den Broker gesendet werden, um auf einen erneuten Abruf zu warten. Der Code lautet wie folgt:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ for (int i = 0; i < msgs.size(); i++) { try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ context.setAckIndex(i); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
Die erfolgreich konsumierte Nachricht wird aus der msgTreeMap in der ProcessQueue entfernt und der kleinste Offset (firstKey) in der msgTreeMap wird zur Aktualisierung zurückgegeben. Hinweis: Der Cluster-Modus-Offset wird auf der Broker-Seite gespeichert. Um den Offset zu aktualisieren, muss eine Nachricht an den Broker gesendet werden. Der Broadcast-Modus-Offset wird jedoch auf der Consumer-Seite gespeichert und nur der lokale Offset muss aktualisiert werden.
Wenn die Logik der Nachrichtenverarbeitung parallel ist, ist es sinnlos, ackIndex einen Wert zuzuweisen, nachdem die Nachrichtenverarbeitung fehlgeschlagen ist, da mehrere Nachrichten fehlschlagen können und die Zuweisung eines Werts zur ackIndex-Variablen nicht korrekt ist. Der beste Weg besteht darin, ackIndex den Wert 0 zuzuweisen und den gesamten Nachrichtenstapel erneut zu verbrauchen, was zu anderen Problemen führen kann.
Bei sequentiellen Nachrichten muss die Nachricht, nachdem sie von msgTreeMap übernommen wurde, zunächst auf sumptionMsgOrderlyTreeMap platziert werden. Beim Aktualisieren des Offsets wird der maximale Nachrichtenoffset (lastKey) von sumptionMsgOrderlyTreeMap übernommen.
Zurück zur ursprünglichen Frage: Wenn ein Nachrichtenstapel der Reihe nach konsumiert wird, ist es unmöglich, dass die 100. Nachricht erfolgreich konsumiert wird, die 50. Nachricht jedoch fehlschlägt, denn wenn die 50. Nachricht fehlschlägt, Die Schleife sollte beendet und der Konsum nicht mehr fortgesetzt werden.
Wenn diese Situation auftritt, wird empfohlen, den gesamten Nachrichtenstapel erneut zu konsumieren, dh ackIndex den Wert 0 zuzuweisen. Auf diese Weise müssen Probleme wie Unterwelt berücksichtigt werden.
Das obige ist der detaillierte Inhalt vonAlibabas zweite Seite: Der RocketMQ-Verbraucher ruft eine Reihe von Nachrichten ab, aber einige davon können nicht konsumiert werden. Wie aktualisiere ich den Offset?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!