


Alibaba の 2 ページ目: RocketMQ コンシューマーはメッセージのバッチをプルしますが、一部のメッセージは消費できません。オフセットを更新するにはどうすればよいですか?
皆さんこんにちは、ジュン兄です。
最近、インタビューに参加したときに読者から質問がありました。コンシューマが 100 個などのメッセージのバッチをプルした場合、100 番目のメッセージの消費は成功しますが、50 番目のメッセージの消費は失敗します。 . オフセット どのように更新されますか?この問題に関して、今日はメッセージのバッチが消費されなかった場合にオフセットを保存する方法について話しましょう。
1 プル メッセージ
1.1 プル リクエストのカプセル化
RocketMQ プッシュ モードを例にとると、RocketMQ コンシューマのスタートアップ コードは次のとおりです。上記の DefaultMQPushConsumer はプッシュ モードのコンシューマーであり、起動メソッドは start です。コンシューマーが開始されると、リバランス スレッド (RebalanceService) がトリガーされます。このスレッドのタスクは、無限ループで継続的にリバランスを行い、最後にメッセージを pullRequestQueue にプルするリクエストをカプセル化することです。このプロセスに関係する UML クラス図は次のとおりです。
1.2 プル リクエストの処理
プル リクエスト PullRequest をカプセル化した後、RocketMQ はメッセージを継続的に取得しません。処理のために pullRequestQueue からリクエストをプルします。 UML クラス図は次のとおりです。
メッセージをプルするためのエントリ メソッドは無限ループであり、コードは次のとおりです。ここにメッセージがある場合、それを PullCallback に送信すると、このコールバック関数によって処理されます。
プルされたメッセージは、まず ProcessQueue の msgTreeMap に置かれ、次に処理のためにスレッド クラス ConsumeRequest にカプセル化されます。コードを合理化した後の ConsumeRequest 処理ロジックは次のようになります。
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
2 消費結果の処理
2.1 同時メッセージ
同時メッセージ処理の消費結果のコードは次のように簡略化されます。次のとおりです:
//PullMessageService public void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
上記のコードからわかるように、メッセージ処理のロジックがシリアルである場合、たとえば記事の冒頭のコードでは for ループを使用してメッセージを処理します。特定のメッセージの処理が失敗した場合は、ループを直接終了し、ConsumeConcurrentlyContext の ackIndex 変数にメッセージ リスト内の失敗したメッセージの位置が割り当てられるため、この失敗したメッセージに続くメッセージは処理されなくなり、ブローカーに送信されます。再引っ張りを待ちます。コードは次のとおりです。
//ConsumeMessageConcurrentlyService.java public void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) { //2.处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
正常に消費されたメッセージは ProcessQueue の msgTreeMap から削除され、msgTreeMap の最小オフセット (firstKey) が更新のために返されます。注: クラスター モード オフセットはブローカー側に保存されます。オフセットを更新するには、メッセージをブローカーに送信する必要があります。ただし、ブロードキャスト モード オフセットはコンシューマー側に保存され、ローカル オフセットのみを更新する必要があります。
メッセージ処理のロジックが並列である場合、複数のメッセージが失敗する可能性があり、ackIndex 変数への値の割り当ては正確ではないため、メッセージ処理が失敗した後に ackIndex に値を割り当てても意味がありません。最善の方法は、ackIndex に 0 の値を割り当て、メッセージのバッチ全体を再度消費することですが、これにより他の問題が発生する可能性があります。
2.2 連続メッセージ
連続メッセージの場合、msgTreeMap からメッセージを取得した後、最初に commerceMsgOrderlyTreeMap に配置する必要があります。オフセットを更新するとき、最大のメッセージ オフセットは 消費MsgOrderlyTreeMap ( lastKey) から取得されます。 。
3 要約
最初の質問に戻りますが、メッセージのバッチが順番に消費される場合、100 番目のメッセージが正常に消費され、50 番目のメッセージが失敗することはあり得ません。 50 番目のメッセージが失敗すると、ループが終了し、消費が続行されないためです。
同時消費の場合、この状況が発生した場合は、メッセージのバッチ全体を再度消費すること、つまり、ackIndex に 0 の値を割り当てることをお勧めします。考慮されます。
以上がAlibaba の 2 ページ目: RocketMQ コンシューマーはメッセージのバッチをプルしますが、一部のメッセージは消費できません。オフセットを更新するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック









1. SpringBoot は RocketMQ を統合します。SpringBoot に RocketMQ を統合するには、次の 4 つの簡単な手順だけが必要です: 1. 関連する依存関係 org.apache.rocketmqrocketmq-spring-boot-starter2 を導入します。RocketMQ の関連構成を追加します。メッセージの最大値。これは、消費された最大値ではなく、プルされたメッセージの最大値であることに注意してください。

皆さんこんにちは、ジュン兄です。最近、インタビュー中に読者から質問がありました。コンシューマーがメッセージのバッチ (100 メッセージなど) をプルし、100 番目のメッセージは正常に消費されますが、50 番目のメッセージは失敗する場合、オフセットはどのように更新されますか?この問題に関して、今日はメッセージのバッチが消費されなかった場合にオフセットを保存する方法について話しましょう。 1 メッセージのプル 1.1 プル リクエストのカプセル化 RocketMQ プッシュ モードを例にとると、RocketMQ コンシューマ スタートアップ コードは次のとおりです。 public static void main(String[] args) throws InterruptedException, MQClie

Docker に RocketMQ をインストールするプロセスは次のとおりです。 Docker ネットワークを作成します。 ターミナルで次のコマンドを実行して、コンテナ間の通信用の Docker ネットワークを作成します。 dockernetworkcreaterocketmq-network RocketMQ イメージをダウンロードします。 ターミナルで次のコマンドを実行して、ダウンロードしますRocketMQ Docker イメージ: dockerpullrocketmqinc/rocketmq NameServer コンテナーを開始します: ターミナルで次のコマンドを実行して NameServer コンテナーを開始します: dockerrun -d --namermqnamesrv --

アプリケーション シナリオ RocketMQ 消費を実装するとき、@RocketMQMessageListener アノテーションは通常、Group、Topic、および selectorExpression (データ フィルタリングと選択ルール) を定義するために使用されます。データの動的フィルタリングをサポートするために、通常、式が使用され、その後、動的切り替えが実行されます。 apollo または Cloudconfig を通じて。依存関係 org.apache.rocketmqrocketmq-spring-boot-starter2.0.4 コンシューマ コードを導入 @RocketMQMessageListener(consumerGroup=&qu)

PHP および MySQL におけるキュー プロデューサーおよびコンシューマー パターンの実装方法 インターネット ビジネスの急速な発展に伴い、システム内で大量のタスクを処理する必要性がますます高まっています。キューは、タスクを効率的に処理するための一般的なソリューションです。 PHP や MySQL でキューのプロデューサー/コンシューマー パターン (Producer-ConsumerPattern) を実装するのが一般的ですが、この記事では具体的な実装方法とコード例を紹介します。生産者・消費者モデル

同時コンピューティングにおける一般的な同期の課題は、プロデューサー/コンシューマー問題として知られています。複数のスレッドまたはプロセスが共有ソースにアクセスするときにそれらの操作を調整するように設計されていることを考えると、この問題にはバランスのとれた実行だけでなく複雑な通信タスクも必要です。今日の議論は、現代のコンピューター サイエンスのフレームワーク、特に C++ 実装の実践における重要性を認識しながら、この困難の背後にある概念を理解するのに役立ちます。生産者・消費者問題の定義と目的を理解する 生産者・消費者問題によってもたらされる課題の解決策は、情報の作成と使用の責任者の間で責任を明確に区別することから生まれます。プロデューサが新しいレコードを自ら生成する場合、コンシューマは操作を同期することでレコードが正しく使用されていることを確認します。競合状態やデッドロックなどの問題を避けるために注意する必要があります。

1 メッセージ publicclassMessageimplementsSerializable{privatestaticfinallongserialVersionUID=8445773977080406428L;//トピック名 privateStringtopic;//メッセージ拡張情報、タグ、キー、遅延レベルはすべてここに存在します privateMappproperties;//メッセージ本文、バイト配列 privatebyte[]body;//メッセージのキーを設定しますmessage , publicvoidsetKeys(Stringkeys){}//Set

1. Consumer が消費を表すことを説明します。インターフェイスは一般パラメータ t を受け取り、accept を呼び出し、パラメータに対して一連の操作を実行しますが、戻り値はありません。 2. インスタンス コンシューマの場合、消費用の入力パラメータを提供する必要があります。 classperson{StringfirstName;StringlastName;person(){}person(StringfirstName,StringlastName){this.firstName=firstName;this.lastName=lastName;}}Java は何に使用できますか? Java は主に次の場所で使用されます: 1.web
