皆さんこんにちは、ジュン兄です。
最近、インタビューに参加したときに読者から質問がありました。コンシューマが 100 個などのメッセージのバッチをプルした場合、100 番目のメッセージの消費は成功しますが、50 番目のメッセージの消費は失敗します。 . オフセット どのように更新されますか?この問題に関して、今日はメッセージのバッチが消費されなかった場合にオフセットを保存する方法について話しましょう。
RocketMQ プッシュ モードを例にとると、RocketMQ コンシューマのスタートアップ コードは次のとおりです。上記の DefaultMQPushConsumer はプッシュ モードのコンシューマーであり、起動メソッドは start です。コンシューマーが開始されると、リバランス スレッド (RebalanceService) がトリガーされます。このスレッドのタスクは、無限ループで継続的にリバランスを行い、最後にメッセージを pullRequestQueue にプルするリクエストをカプセル化することです。このプロセスに関係する UML クラス図は次のとおりです。
1.2 プル リクエストの処理
メッセージをプルするためのエントリ メソッドは無限ループであり、コードは次のとおりです。ここにメッセージがある場合、それを PullCallback に送信すると、このコールバック関数によって処理されます。
プルされたメッセージは、まず ProcessQueue の msgTreeMap に置かれ、次に処理のためにスレッド クラス ConsumeRequest にカプセル化されます。コードを合理化した後の ConsumeRequest 処理ロジックは次のようになります。
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
2 消費結果の処理
2.1 同時メッセージ
//PullMessageService public void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
//ConsumeMessageConcurrentlyService.java public void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) { //2.处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
正常に消費されたメッセージは ProcessQueue の msgTreeMap から削除され、msgTreeMap の最小オフセット (firstKey) が更新のために返されます。注: クラスター モード オフセットはブローカー側に保存されます。オフセットを更新するには、メッセージをブローカーに送信する必要があります。ただし、ブロードキャスト モード オフセットはコンシューマー側に保存され、ローカル オフセットのみを更新する必要があります。
メッセージ処理のロジックが並列である場合、複数のメッセージが失敗する可能性があり、ackIndex 変数への値の割り当ては正確ではないため、メッセージ処理が失敗した後に ackIndex に値を割り当てても意味がありません。最善の方法は、ackIndex に 0 の値を割り当て、メッセージのバッチ全体を再度消費することですが、これにより他の問題が発生する可能性があります。
2.2 連続メッセージ
連続メッセージの場合、msgTreeMap からメッセージを取得した後、最初に commerceMsgOrderlyTreeMap に配置する必要があります。オフセットを更新するとき、最大のメッセージ オフセットは 消費MsgOrderlyTreeMap ( lastKey) から取得されます。 。
最初の質問に戻りますが、メッセージのバッチが順番に消費される場合、100 番目のメッセージが正常に消費され、50 番目のメッセージが失敗することはあり得ません。 50 番目のメッセージが失敗すると、ループが終了し、消費が続行されないためです。
以上がAlibaba の 2 ページ目: RocketMQ コンシューマーはメッセージのバッチをプルしますが、一部のメッセージは消費できません。オフセットを更新するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。