Alibaba の 2 ページ目: RocketMQ コンシューマーはメッセージのバッチをプルしますが、一部のメッセージは消費できません。オフセットを更新するにはどうすればよいですか?

WBOY
リリース: 2023-04-12 23:28:14
転載
980 人が閲覧しました

皆さんこんにちは、ジュン兄です。

最近、インタビューに参加したときに読者から質問がありました。コンシューマが 100 個などのメッセージのバッチをプルした場合、100 番目のメッセージの消費は成功しますが、50 番目のメッセージの消費は失敗します。 . オフセット どのように更新されますか?この問題に関して、今日はメッセージのバッチが消費されなかった場合にオフセットを保存する方法について話しましょう。

1 プル メッセージ

1.1 プル リクエストのカプセル化

RocketMQ プッシュ モードを例にとると、RocketMQ コンシューマのスタートアップ コードは次のとおりです。上記の DefaultMQPushConsumer はプッシュ モードのコンシューマーであり、起動メソッドは start です。コンシューマーが開始されると、リバランス スレッド (RebalanceService) がトリガーされます。このスレッドのタスクは、無限ループで継続的にリバランスを行い、最後にメッセージを pullRequestQueue にプルするリクエストをカプセル化することです。このプロセスに関係する UML クラス図は次のとおりです。

Alibaba の 2 ページ目: RocketMQ コンシューマーはメッセージのバッチをプルしますが、一部のメッセージは消費できません。オフセットを更新するにはどうすればよいですか?1.2 プル リクエストの処理

プル リクエスト PullRequest をカプセル化した後、RocketMQ はメッセージを継続的に取得しません。処理のために pullRequestQueue からリクエストをプルします。 UML クラス図は次のとおりです。

Alibaba の 2 ページ目: RocketMQ コンシューマーはメッセージのバッチをプルしますが、一部のメッセージは消費できません。オフセットを更新するにはどうすればよいですか?メッセージをプルするためのエントリ メソッドは無限ループであり、コードは次のとおりです。ここにメッセージがある場合、それを PullCallback に送信すると、このコールバック関数によって処理されます。

プルされたメッセージは、まず ProcessQueue の msgTreeMap に置かれ、次に処理のためにスレッド クラス ConsumeRequest にカプセル化されます。コードを合理化した後の ConsumeRequest 処理ロジックは次のようになります。

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
 }catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}
ログイン後にコピー

2 消費結果の処理

2.1 同時メッセージ

同時メッセージ処理の消費結果のコードは次のように簡略化されます。次のとおりです:

//PullMessageService
public void run(){
 log.info(this.getServiceName() + " service started");

 while (!this.isStopped()) {
try {
 PullRequest pullRequest = this.pullRequestQueue.take();
 this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
 log.error("Pull Message Service Run Method exception", e);
}
 }

 log.info(this.getServiceName() + " service end");
}
ログイン後にコピー

上記のコードからわかるように、メッセージ処理のロジックがシリアルである場合、たとえば記事の冒頭のコードでは for ループを使用してメッセージを処理します。特定のメッセージの処理が失敗した場合は、ループを直接終了し、ConsumeConcurrentlyContext の ackIndex 変数にメッセージ リスト内の失敗したメッセージの位置が割り当てられるため、この失敗したメッセージに続くメッセージは処理されなくなり、ブローカーに送信されます。再引っ張りを待ちます。コードは次のとおりです。

//ConsumeMessageConcurrentlyService.java
public void run(){
 MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
 ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
 ConsumeConcurrentlyStatus status = null;
 try {
//1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
 } catch (Throwable e) {
 }
 if (!processQueue.isDropped()) {
//2.处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
 } else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
 }
}
ログイン後にコピー

正常に消費されたメッセージは ProcessQueue の msgTreeMap から削除され、msgTreeMap の最小オフセット (firstKey) が更新のために返されます。注: クラスター モード オフセットはブローカー側に保存されます。オフセットを更新するには、メッセージをブローカーに送信する必要があります。ただし、ブロードキャスト モード オフセットはコンシューマー側に保存され、ローカル オフセットのみを更新する必要があります。

メッセージ処理のロジックが並列である場合、複数のメッセージが失敗する可能性があり、ackIndex 変数への値の割り当ては正確ではないため、メッセージ処理が失敗した後に ackIndex に値を割り当てても意味がありません。最善の方法は、ackIndex に 0 の値を割り当て、メッセージのバッチ全体を再度消費することですが、これにより他の問題が発生する可能性があります。

2.2 連続メッセージ

連続メッセージの場合、msgTreeMap からメッセージを取得した後、最初に commerceMsgOrderlyTreeMap に配置する必要があります。オフセットを更新するとき、最大のメッセージ オフセットは 消費MsgOrderlyTreeMap ( lastKey) から取得されます。 。

3 要約

最初の質問に戻りますが、メッセージのバッチが順番に消費される場合、100 番目のメッセージが正常に消費され、50 番目のメッセージが失敗することはあり得ません。 50 番目のメッセージが失敗すると、ループが終了し、消費が続行されないためです。

同時消費の場合、この状況が発生した場合は、メッセージのバッチ全体を再度消費すること、つまり、ackIndex に 0 の値を割り当てることをお勧めします。考慮されます。

以上がAlibaba の 2 ページ目: RocketMQ コンシューマーはメッセージのバッチをプルしますが、一部のメッセージは消費できません。オフセットを更新するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:51cto.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート