仕事上、非同期のメッセージ処理が必要なビジネスシーンに遭遇することがよくありますが、メッセージの性質に応じて処理方法が全く異なります。
1. メッセージは独立していません
独立したメッセージには通常、逐次的な依存関係があります。この時点で、メッセージ処理メカニズムは線形キュー処理モードに縮退し、1 つのコンシューマーのみがキューに進むことができます。シングルスレッド。メッセージを処理します。
2. メッセージは完全に独立しています
完全に独立したメッセージは、複数のコンシューマ (スレッド) によって同時に処理され、最大の同時処理能力を実現します。
3. メッセージは完全に独立しているわけではありません
通常、これは、同じソース (同じプロデューサー) からのメッセージが順序付けされる必要があり、異なるソースからのメッセージの順序が必要な場合です。それとは何の関係もありません。
このシナリオでのメッセージ処理は比較的複雑になります。同じソースからのメッセージの順序を保証するために、固定コンシューマ スレッドを同じソースからのメッセージにバインドすることを考えるのは簡単です。これは非常に簡単です。しかし大きな問題を抱えています。
プロデューサの数が多い場合、バインドされたスレッドの数が十分ではない可能性があります。もちろん、スレッド リソースは再利用でき、同じスレッドを複数のメッセージ ソースにバインドして処理することもできます。これにより、別のスレッドが発生します。問題: メッセージ ソース間の相互作用。
次のシナリオを考えてみましょう:
プロデューサー P1 が大量のメッセージを生成してキューに入り、処理のためにコンシューマー スレッド C1 に割り当てられます (C1 の処理には時間がかかる場合があります)。今回は、プロデューサ P2 が生成するP1 と P2 間の相互影響、また他のコンシューマー スレッドを十分に利用できないことにより、不均衡が生じます。
したがって、このような問題を回避することを考慮する必要があります。消費処理の適時性 (できるだけ早く)、分離 (相互干渉の回避)、バランス (同時処理の最大化) を実現します。
実装には 2 つのモードがありますが、考えやすいのはスレッド ディスパッチングです。モデル (PUSH モード)、具体的な方法は通常次のとおりです:
1. キューをポーリングしてメッセージを取得するグローバル メッセージ ディスパッチャーがあります。
2. メッセージ ソースに従って、処理のために適切なコンシューマ スレッドにメッセージをディスパッチします。
分散アルゴリズム メカニズムは、メッセージ ソースに基づくハッシュのように単純なものもあれば、各コンシューマ スレッドの現在の負荷、待機キューの長さ、メッセージの複雑さなどに応じて複雑なものもあります。総合的な分析に基づいて配布対象が選択されます。
単純なハッシュでは、上記のシナリオで説明した問題が確実に発生しますが、複雑な分散計算は実装が非常に面倒で複雑であることは明らかであり、効率は必ずしも良いとは限らず、完全なバランスを達成することは困難です。バランスの。
2 番目のモードは PULL メソッドを使用し、スレッドはオンデマンドでプルします。具体的なメソッドは次のとおりです:
1. メッセージ ソースは、生成されたメッセージを、対応する一時キューに直接入れます。ソース (以下のとおり) 示されている各セッションは異なるメッセージ ソースを表します)、セッションはブロッキング キューに配置され、
#2 の処理をスレッドに通知します。複数のコンシューマ スレッドが同時にキューをポーリングします。メッセージを競合します (1 つのスレッドのみが Go to 3 を実行することを保証します。キュー インジケーターが他のスレッドによって処理されているかどうかを確認します (実装には、スレッド レベルでの同一生成元メッセージに基づく検出同期が必要です) 4. 他のスレッドで処理されていない場合は、同期領域設定処理でステータスを示し、同期領域を抜けた後に一時キュー内のメッセージを処理します。処理が完了したら、最後に再び同期領域設定処理に入り、ステータスがアイドルであることを示します。 以下は、消費スレッドの処理プロセスを記述するコードです:public void run() { try { for (AbstractSession s = squeue.take(); s != null; s = squeue.take()) { // first check any worker is processing this session? // if any other worker thread is processing this event with same session, just ignore it. synchronized (s) { if (!s.isEventProcessing()) { s.setEventProcessing(true); } else { continue; } } // fire events with same session fire(s); // last reset processing flag and quit current thread processing s.setEventProcessing(false); // if remaining events, so re-insert to session queue if (s.getEventQueue().size() > 0 && !s.isEventProcessing()) { squeue.offer(s); } } } catch (InterruptedException e) { LOG.warn(e.getMessage(), e); } }
以上がSpringboot の非同期メッセージ処理メソッドの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。