Redis はメッセージ キューと遅延メッセージ キューをどのように実装しますか?以下の記事ではRedisにおけるメッセージキューと遅延メッセージキューの実装方法を紹介しますので、ご参考になれば幸いです。
Redis に関しては、キャッシュとして考える人が多いかもしれません。実際、Redis はいくつかの単純なメッセージ キューの目的も実装できます。リスト データを使用できます。キューを実装するための構造体。 [関連する推奨事項: Redis ビデオ チュートリアル ]
lpush (左プッシュ)
by queue Store のいくつかのコマンド左側から入れます
rpush (右プッシュ)
キューの右側から格納します
lpop (左ポップ)
Takeキューの左側から取り出します
rpop (right Pop)
キューの右側から取り出します
上記の 4 つのコマンドは list ヘルプを使用できます。キューまたはスタックを実装します。キューの特性は高度です まず、スタックの特性は先入れ後出しです。
したがって、キュー実装では lpush rpop または rpush lpop を使用できます。
スタック実装は lpush lpop または rpush rpop です。
プロデューサーがメッセージを公開します
まず、rpush を使用して、notify-queue というキューに 5 つの要素、つまり 1 2 3 4 5 を追加します。これは、プロデューサーとしてメッセージをパブリッシュするためのものです。##消費者消費ニュース
プロデューサが rpush を使用しているため、コンシューマは lpop を使用する必要があります。下の図を参照してください。-queue が 1 から順番にメッセージを消費することを通知し続けています。最終的に、キューにはメッセージがなく、ポップアップは常に空になります 空のポーリングの問題解決策 A (休止状態)
空のポーリングはクライアントと Redis の両方でより多くのリソース消費を引き起こすため、クライアントに空のデータを受信したときに 1 秒間スリープし、1 秒後にデータをプルすることで、消費量を削減できます#Thread.sleep(1000) このソリューションにも欠陥があります。つまり、メッセージ消費の遅延が増加します。コンシューマーが 1 つだけの場合、遅延は 1 秒です。つまり、空のポーリングの後、たまたまスリープ状態になっていますが、この時点でたまたまメッセージが到着します。それでも、
コンシューマが複数ある場合、各コンシューマのスリープ時間をずらすことで多少のレイテンシは減りますが、何か良い方法はありますか? ほぼ0を達成できる方法待ち時間?
ソリューション B (読み取りのブロック)実際には、キュー データの取得に関する Redis のコマンドが 2 つあります。つまり、読み取りのブロックです。
blpop (左ポップのブロック)
brpop (右ポップのブロック)
キューにデータがない場合、読み取りのブロックは休止状態に入ります。メッセージが到着すると、すぐに反応します。データを読み取るため、lpop/rpop の代わりに blpop/brpop を使用すると、メッセージ遅延の問題を解決できます。
引き続き 3 つの属性、6、7、8
blpop を使用してキューを読み取ります。最後のパラメータは、読み取りをブロックするための待ち時間です。この時間が経過してもメッセージがない場合は、nil が返されます。この時点で、引き続き blpop 操作を繰り返すことができます。
読み取りブロック時のアイドル接続の自動切断の問題クライアントが読み取りブロックを使用する場合、ブロック時間が長すぎると、サービスは通常、それをアイドル接続として扱い、リソースを占有する無駄な接続を減らすために積極的に接続を切断します。このとき、クライアントは例外をスローします。
したがって、クライアントがブロッキングを使用する場合は注意してください。例外をキャッチし、再試行するなど、それに応じて処理する必要があります。
Java クライアントはメッセージ キューを実装しますコマンド ライン クライアント redis-cli が Java に変更される点を除いて、アイデアは上記と同じです。 1 つのスレッドまたは複数のスレッドが rpush,
を公開します。別のスレッドまたは複数のスレッドが BLPOP 消費を実行します。完成したコードは次のとおりです: https://github.com/qiaomengnan16/redis-demo/tree/main/redis-queue
Publisher
# サブスクライバ
##遅延キューの実装アイデアzset が役立ちますまず、zset はスコアでソートでき、スコアはタイムスタンプを保存できます。したがって、メッセージをパブリッシュするたびに、現在のタイムスタンプと遅延したタイムスタンプを使用します。メッセージを受信すると、zset データをインターセプトし、現在の時刻を満たしたメッセージを取得します (つまり、現在のタイムスタンプ以下のスコアを持つデータが取得されます。現在のタイムスタンプ以下のスコアは、メッセージが時間に達しました。メッセージが大きい場合は、消費するまでにしばらく待つ必要があることを意味します)。
主要コマンド zadd (パブリッシャー)、zrangebyscore (サブスクライバー)、zrem (サブスクライバーはデータ消費後に削除)
コマンドの実装
zadd を使用して 4 つのデータを追加しました。これらは 1 秒、2 秒、3 秒後に消費できるデータです (擬似的に、これは実際には単なるスコアです)。また、その後に消費できる kafka もあります。 10 秒です。
現在 3 秒目の場合は、1 秒以上 3 秒以下の zset 内のデータを取得します。この間隔のデータはまさに私たちが消費できるものであるため、はい、条件を満たす 3 つのデータを取り出したことがわかります。一度に 1 つのデータを消費する場合は、制限を追加できます。次の図は、消費できる最初のデータを取り出していることがわかります。 list の lpop/ や blpop とは異なることに注意してください (これらは、データがポップアップすると元のキュー内のデータを自動的に削除します)。データは取得されますが、zrem を使用しない場合は、それを削除してください。このデータはまだ zset に存在するため、他の人によって読み取られます。
しかし、zrem 最初に他の人によって削除 (消費) されている可能性があるため、コードは次のことも判断する必要があります。 zrem の戻り値が 0 より大きい場合、このメッセージが正常にプリエンプトされたかどうかが判断され、成功後にメッセージが正しく消費されます。
#コードの実装
パブリッシャー
# #Subscriber
##遅延効果をテストする
#完全なコード アドレス: https: //github.com/qiaomengnan16/redis-demo/tree/main/redis-layed-queue
#最適化、lua を使用した実装##上記で実装した遅延キューに問題があり、zremを使用してデータを取得するかどうかを判断する際、データが取得できていない可能性が高く、このまま読み続けると取得できない可能性があります。したがって、Lua スクリプトを使用して最適化を実行できます ( zrangebyscore と zrem をアトミックな操作にすることで、マルチスレッドの競合と取得できないリソースの無駄を回避できます)。
#結論一部のプロフェッショナル キュー ミドルウェアは、適用がより複雑になり、操作が増加します。 RabbitMQ などのメンテナンス コストがかかります。メッセージを送信する前に、Exchange スイッチを作成してからキューを作成する必要があります。その後、Exchange とキューをバインドする必要があります。メッセージを送信するときは、ルーティング キーを指定する必要があります。 シナリオが単純な場合は、redis を使用してキューを実装できますが、redis にはプロフェッショナル キューの特性がないことに注意してください。 ACK の保証はありません。これはメッセージの信頼性が低いことを意味します。消費に失敗するとメッセージは失われます。100% の信頼性が必要な場合でも、保証としてプロフェッショナル キュー ミドルウェアや ACK などのその他のメカニズムを使用する必要があります。
プログラミング関連の知識について詳しくは、プログラミング入門をご覧ください。 !
以上がRedis におけるメッセージキューと遅延メッセージキューの実装方法についての簡単な説明の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。