Redis の特別なデータ型ストリーム

WBOY
リリース: 2022-10-11 17:47:36
転載
2960 人が閲覧しました

この記事では、Redis に関する関連知識を提供します。主に特殊なデータ型ストリームの関連コンテンツを紹介します。Redis は豊富なデータ型を提供しており、特殊なものは 4 つあります。ビットマップ、ハイパーログログ、地理空間、ストリーム、ストリーム関連の問題を見てみましょう。皆さんのお役に立てれば幸いです。

Redis の特別なデータ型ストリーム

推奨学習: Redis ビデオ チュートリアル

Redis Stream は、Redis 5.0 バージョンで新しく追加されたデータ型です。Redis は特別に使用されます。メッセージキュー用の設計されたデータ型。

Redis 5.0 Stream が登場する前は、メッセージ キューの実装には次のような独自の欠陥がありました。

  • パブリッシュ/サブスクライブ モデルは耐久性がなく、したがって信頼性が低かった。メッセージを保存し、オフラインで再接続しているクライアントの履歴メッセージを読み取ることができないという欠陥。

  • #List のメッセージ キューの実装方法は繰り返し使用できず、メッセージは後で削除されます。消費されるため、プロデューサーはグローバルに一意の ID を自分で実装する必要があります。

上記の問題に基づいて、Redis 5.0 では、このバージョンの最も重要な機能でもある Stream タイプを導入しました。これは、メッセージ キューを完全に実装するために使用されます。メッセージの永続性とメッセージの永続性をサポートします。グローバル メッセージの自動生成一意の ID、ACK 確認メッセージ モードのサポート、コンシューマー グループ モードのサポートなどにより、メッセージ キューの安定性と信頼性が向上します。

よく使用するコマンド

ストリームメッセージキュー操作コマンド:

  • XADD: Insert message,保証されているため、グローバルに一意の ID が自動的に生成されます。

  • XDEL: メッセージ ID に基づいてメッセージを削除します。

  • DEL: 全体を削除します。 Stream;

# XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]
127.0.0.1:6379> XADD s1 * name sid10t
"1665047636078-0"
127.0.0.1:6379> XADD s1 * name sidiot
"1665047646214-0"
# XDEL key id [id ...]
127.0.0.1:6379> XDEL s1 1665047646214-0
(integer) 1
# DEL key [key ...]
127.0.0.1:6379> DEL s1
(integer) 1
ログイン後にコピー

Redis の特別なデータ型ストリーム

  • XLEN: メッセージの長さをクエリします;

  • XREAD : メッセージを読み取るために使用され、ID によってデータを読み取ることができます;

  • XRANGE: 間隔メッセージを読み取ります;

  • XTRIM: メッセージの数をトリミングしますキューメッセージ;

  • # XLEN key
    127.0.0.1:6379> XLEN s1
    (integer) 2
    # XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
    127.0.0.1:6379> XREAD streams s1 0-0
    1) 1) "s1"
       2) 1) 1) "1665047636078-0"
             2) 1) "name"
                2) "sid10t"
          2) 1) "1665047646214-0"
             2) 1) "name"
                2) "sidiot"
    127.0.0.1:6379> XREAD count 1 streams s1 0-0
    1) 1) "s1"
       2) 1) 1) "1665047636078-0"
             2) 1) "name"
                2) "sid10t"
        # XADD 了一条消息之后的扩展
        127.0.0.1:6379> XREAD streams s1 1665047636078-0
        1) 1) "s1"
           2) 1) 1) "1665047646214-0"
                 2) 1) "name"
                    2) "sidiot"
              2) 1) "1665053702766-0"
                 2) 1) "age"
                    2) "18"
    # XRANGE key start end [COUNT count]
    127.0.0.1:6379> XRANGE s1 - +
    1) 1) "1665047636078-0"
       2) 1) "name"
          2) "sid10t"
    2) 1) "1665047646214-0"
       2) 1) "name"
          2) "sidiot"
    3) 1) "1665053702766-0"
       2) 1) "age"
          2) "18"
    127.0.0.1:6379> XRANGE s1 1665047636078-0 1665047646214-0
    1) 1) "1665047636078-0"
       2) 1) "name"
          2) "sid10t"
    2) 1) "1665047646214-0"
       2) 1) "name"
          2) "sidiot"
    # XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]
    127.0.0.1:6379> XLEN s1
    (integer) 3
    127.0.0.1:6379> XTRIM s1 maxlen 2
    (integer) 1
    127.0.0.1:6379> XLEN s1
    (integer) 2
    ログイン後にコピー
  • XGROUP CREATE: コンシューマグループの作成;

  • XREADGROUP: 次の形式でメッセージを読み取ります。コンシューマ グループ;

  • XPENDING および XACK:

XPENDING コマンドを使用して、「既読ですがまだ確認されていません」メッセージをクエリできます。各コンシューマ グループ内のすべてのコンシューマの数。

XACK コマンドは、メッセージ処理が完了したことをメッセージ キューに確認するために使用されます。

# XGROUP CREATE key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read]
# 需要注意的是,XGROUP CREATE 的 streams 必须是一个存在的 streams,否则会报错;
127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0
(error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
# 0-0 从头开始消费,$ 从尾开始消费;
127.0.0.1:6379> XADD myStream * name sid10t
"1665057823181-0"
127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0
OK
127.0.0.1:6379> XGROUP CREATE myStream cGroup-tail $
OK
# XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
127.0.0.1:6379> XREADGROUP Group cGroup-top name count 2 STREAMS myStream >
1) 1) "myStream"
   2) 1) 1) "1665058086931-0"
         2) 1) "name"
            2) "sid10t"
      2) 1) "1665058090167-0"
         2) 1) "name"
            2) "sidiot"
ログイン後にコピー

アプリケーション シナリオ

メッセージ キュー

プロデューサー XADD コマンドを使用してメッセージを挿入します:

# * 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID
# 往名称为 mymq 的消息队列中插入一条消息,消息的键是 name,值是 sid10t
127.0.0.1:6379> XADD mymq * name sid10t
"1665058759764-0"
ログイン後にコピー

挿入が成功すると、グローバルに一意の ID: "1665058759764-0" が返されます。メッセージのグローバルに一意な ID は 2 つの部分で構成されます:

  • 最初の部分「1665058759764」は、データが挿入されたときにミリ秒単位で計算された現在のサーバー時間です。

    2 番目の部分は、現在のミリ秒における挿入されたメッセージのメッセージ シーケンス番号を示し、0 から始まる番号が付けられます。たとえば、「1665058759764-0」は、「1665058759764」ミリ秒以内の最初のメッセージを意味します。
  • コンシューマは、XREAD コマンドを通じてメッセージ キューからメッセージを読み取るときに、メッセージ ID を指定し、このメッセージ ID を持つ次のメッセージから読み取りを開始できます (これは入力メッセージであることに注意してください)クエリ入力IDのメッセージではなく、次のID情報の読み取りを開始します)。
  • 127.0.0.1:6379> XREAD STREAMS mymq 1665058759764-0
    (nil)
    127.0.0.1:6379> XREAD STREAMS mymq 1665058759763-0
    1) 1) "mymq"
       2) 1) 1) "1665058759764-0"
             2) 1) "name"
                2) "sid10t"
    ログイン後にコピー
ブロッキング読み取り (データがない場合のブロック) を実装したい場合は、XRAED を呼び出すときに BLOCK 構成項目を設定して、BRPOP と同様のブロッキング読み取り操作を実装できます。

たとえば、次のコマンドは、BLOCK 10000 の構成項目を設定します。10000 の単位はミリ秒です。これは、XREAD が最新のメッセージを読み取るときに、メッセージが到着しない場合、XREAD は 10000 ミリ秒間ブロックすることを示します。 10秒) を押してから戻ります。

# 命令最后的 $ 符号表示读取最新的消息
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS mymq $
(nil)
(10.01s)
ログイン後にコピー

Stream の基本的なメソッドは、xadd を使用してメッセージを保存し、xread を使用してループ内のメッセージの読み取りをブロックし、メッセージ キューの単純なバージョンを実装します。対話プロセスは次の図に示すとおりです:

先ほど紹介したListの操作にも対応しています 次に、Streamの独自機能を見ていきましょう。 Redis の特別なデータ型ストリーム

Stream は XGROUP を使用してコンシューマ グループを作成できます。コンシューマ グループを作成した後、Stream は XREADGROUP コマンドを使用して、コンシューマ グループ内のコンシューマにメッセージを読み取ることができます。

2 つのコンシューマ グループを作成します。これら 2 つのコンシューマ グループによって消費されるメッセージ キューは mymq です。両方とも、最初のメッセージから読み取りを開始するように指定します:

# 创建一个名为 group1 的消费组,0-0 表示从第一条消息开始读取。
127.0.0.1:6379> XGROUP CREATE mymq group1 0-0
OK
# 创建一个名为 group2 的消费组,0-0 表示从第一条消息开始读取。
127.0.0.1:6379> XGROUP CREATE mymq group2 0-0
OK
ログイン後にコピー

Consumer Consumer1 in Consumer Group group1 mymq メッセージ キューからのすべてのメッセージの読み取りは次のとおりです。

# 命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665058759764-0"
         2) 1) "name"
            2) "sid10t"
ログイン後にコピー

メッセージ キュー内のメッセージがコンシューマ グループ内のコンシューマによって読み取られると、そのメッセージはコンシューマ グループ内の他のコンシューマによって読み取られなくなります。読み取り、つまり、同じコンシューマ グループ内のコンシューマは同じメッセージを消費できません。

比如说,我们执行完刚才的 XREADGROUP 命令后,再执行一次同样的命令,此时读到的就是空值了:

127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
(nil)
ログイン後にコピー

但是,不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息) 。

比如说,刚才 group1 消费组里的 consumer1 消费者消费了一条 id 为 1665058759764-0 的消息,现在用 group2 消费组里的 consumer1 消费者消费消息:

127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665058759764-0"
         2) 1) "name"
            2) "sid10t"
ログイン後にコピー

因为我创建两组的消费组都是从第一条消息开始读取,所以可以看到第二组的消费者依然可以消费 id 为 1665058759764-0 的这一条消息。因此,不同的消费组的消费者可以消费同一条消息。

使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。

例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。

# 让 group2 中的 consumer1 从 mymq 消息队列中消费一条消息
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665060632864-0"
         2) 1) "name"
            2) "sid10t"
            
# 让 group2 中的 consumer2 从 mymq 消息队列中消费一条消息
127.0.0.1:6379> XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665060633903-0"
         2) 1) "name"
            2) "sid10t"
            
# 让 group2 中的 consumer3 从 mymq 消息队列中消费一条消息
127.0.0.1:6379> XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665060634962-0"
         2) 1) "name"
            2) "sid10t"
ログイン後にコピー

基于 Stream 实现的消息队列,如何保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息?

Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams “消息已经处理完成”。

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下图所示:

Redis の特別なデータ型ストリーム

如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。

例如,我们来查看一下 group2 中各个消费者已读取、但尚未确认的消息个数,命令如下:

127.0.0.1:6379> XPENDING mymq group2
1) (integer) 4
2) "1665058759764-0"
3) "1665060634962-0"
4) 1) 1) "consumer1"
      2) "2"
   2) 1) "consumer2"
      2) "1"
   3) 1) "consumer3"
      2) "1"
ログイン後にコピー

如果想查看某个消费者具体读取了哪些数据,可以执行下面的命令:

# 查看 group2 里 consumer2 已从 mymq 消息队列中读取了哪些消息
127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2
1) 1) "1665060633903-0"
   2) "consumer2"
   3) (integer) 1888805
   4) (integer) 1
ログイン後にコピー

可以看到,consumer2 已读取的消息的 ID 是 1665060633903-0。

一旦消息 1665060633903-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。

127.0.0.1:6379> XACK mymq group2 1665060633903-0
(integer) 1
ログイン後にコピー

当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。

127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2
(empty array)
ログイン後にコピー

小结

好了,基于 Stream 实现的消息队列就说到这里了,小结一下:

  • 消息保序:XADD/XREAD

  • 阻塞读取:XREAD block

  • 重复消息处理:Stream 在使用 XADD 命令,会自动生成全局唯一 ID;

  • 消息可靠性:内部使用 PENDING List 自动保存消息,使用 XPENDING 命令查看消费组已经读取但是未被确认的消息,消费者使用 XACK 确认消息;

  • 支持消费组形式消费数据

Redis 基于 Stream 消息队列与专业的消息队列有哪些差距?

一个专业的消息队列,必须要做到两大块:

  • 消息不可丢。

  • 消息可堆积。

1、Redis Stream 消息会丢失吗?

使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。

Redis の特別なデータ型ストリーム

Redis Stream 消息队列能不能保证三个环节都不丢失数据?

  • Redis 生产者会不会丢消息?生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。 从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到 ( MQ 中间件) 的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。

  • Redis 消费者会不会丢消息?不会,因为 Stream ( MQ 中间件)会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,但是未被确认的消息。消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。等到消费者执行完业务逻辑后,再发送消费确认 XACK 命令,也能保证消息的不丢失。

  • Redis 消息中间件会不会丢消息?会,Redis 在以下 2 个场景下,都会导致数据丢失:

AOF 永続性は 1 秒ごとにディスクに書き込むように構成されていますが、このディスク書き込みプロセスは非同期であり、Redis がダウンするとデータが失われる可能性があります。

マスター/スレーブ レプリケーションは非同期でもあります。はい、マスターとスレーブを切り替えるときにデータが失われる可能性もあります (新しいウィンドウが開きます)。

ご覧のとおり、Redis はキュー ミドルウェア リンクでメッセージが失われないことを保証できません。クラスターのデプロイには、RabbitMQ や Kafka などのプロフェッショナルなキュー ミドルウェアが使用されます。プロデューサーがメッセージをパブリッシュするとき、キュー ミドルウェアは通常、「複数のノード」、つまり複数のコピーを書き込みます。失敗しても、クラスター内のデータは失われないことが保証されます。

2. Redis Stream メッセージは蓄積できますか?

Redis データはメモリに保存されます。つまり、メッセージ バックログが発生すると、Redis メモリは増加し続けます。マシンのメモリ制限を超えると、OOM に直面します。 。

そこで、Redis の Stream は、この状況を回避するために、キューの最大長を指定する機能を提供します。

キューの最大長を指定した場合、キュー長が上限を超えると古いメッセージは削除され、固定長の新しいメッセージのみが残ります。この観点から、メッセージがバックログされるときに Stream の最大長が指定されている場合でも、メッセージが失われる可能性があります。

しかし、Kafka や RabbitMQ などのプロフェッショナル メッセージ キューのデータはディスクに保存されるため、メッセージがバックログされると、より多くのディスク領域が占有されるだけです。

したがって、Redis をキューとして使用すると、次の 2 つの問題に直面します:

  • Redis 自体がデータを失う可能性があります;

  • メッセージの圧迫に直面すると、メモリ リソースが不足します;

したがって、Redis をメッセージ キューとして使用できるかどうかは、ビジネス シナリオによって異なります:

  • ビジネス シナリオが十分に単純で、データ損失の影響を受けにくく、メッセージ バックログの可能性が比較的小さい場合は、Redis をキューとして使用してもまったく問題ありません。

  • ビジネスに大量のメッセージがあり、メッセージ バックログが発生する可能性が比較的高く、データ損失が許容できない場合は、専門的なメッセージ キュー ミドルウェアを使用してください。

補足: Redis パブリッシュ/サブスクライブ メカニズムをメッセージ キューとして使用できないのはなぜですか?

パブリッシュおよびサブスクライブ メカニズムには次の欠点があり、これらはすべてデータの損失に関連しています:

  • パブリッシュ/サブスクライブ メカニズムはどのデータ型にも基づいて実装されていないため、 「データ永続性」機能、つまりパブリッシュ/サブスクライブ メカニズムの関連操作は、RDB および AOF に書き込まれません。Redis がクラッシュして再起動すると、パブリッシュ/サブスクライブ メカニズムのすべてのデータが失われます。

  • パブリッシュ/サブスクライブ モードは、「送信して忘れる」動作モードです。サブスクライバーがオフラインになって再接続した場合、以前の履歴メッセージを利用することはできません。

  • コンシューマ側に特定のメッセージのバックログ、つまりプロデューサによって送信されたメッセージがあり、それが 32M を超えるか、8M を超えたままの場合、コンシューマはそれらのメッセージを消費できない場合60 秒以内, コンシューマ側は強制的に切断されます。このパラメータは構成ファイルで設定されます。デフォルト値は client-output-buffer-limit pubsub 32mb 8mb 60 です。

したがって、パブリッシュ/サブスクライブ メカニズムは、インスタント通信シナリオにのみ適しています。たとえば、センチネル クラスター (新しいウィンドウが開きます) を構築するシナリオでは、パブリッシュ/サブスクライブ メカニズムが使用されます。

推奨される学習: Redis ビデオ チュートリアル

以上がRedis の特別なデータ型ストリームの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:juejin.im
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート
私たちについて 免責事項 Sitemap
PHP中国語ウェブサイト:福祉オンライン PHP トレーニング,PHP 学習者の迅速な成長を支援します!