首頁 > 資料庫 > Redis > Redis特殊資料型態之stream

Redis特殊資料型態之stream

WBOY
發布: 2022-10-11 17:47:36
轉載
3116 人瀏覽過

這篇文章為大家帶來了關於Redis的相關知識,其中主要介紹了一個特殊的資料類型stream的相關內容,redis提供了豐富的資料類型,特殊的有四種bitmap、hyperloglog、geospatial、stream,以下就一起來看stream的相關問題,希望對大家有幫助。

Redis特殊資料型態之stream

建議學習:Redis影片教學

Redis Stream 是Redis 5.0 版本新增加的資料類型,Redis 專門為訊息佇列設計的資料類型。

在Redis 5.0 Stream 沒出來之前,訊息佇列的實作方式都有各自的缺陷,例如:

  • 發布訂閱模式,不能持久化也就無法可靠的保存訊息,且對於離線重連的客戶端不能讀取歷史訊息的缺陷;

  • List 實作訊息佇列的方式不能重複消費,一個訊息消費完就會被刪除,而且生產者需要自行實現全域唯一ID。

基於上述問題,Redis 5.0 便推出了Stream 類型也是此版本最重要的功能,用於完美地實現訊息佇列,它支援訊息的持久化、支援自動產生全域唯一ID、支援ack 確認訊息的模式、支援消費群組模式等,讓訊息佇列更加的穩定可靠。

常用指令

Stream 訊息佇列操作指令:

  • XADD : 插入訊息,保證有序,可以自動產生全域唯一ID

  • XDEL : 根據訊息ID 刪除訊息;

  • DEL : 刪除整個Stream;

# XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]
127.0.0.1:6379> XADD s1 * name sid10t
"1665047636078-0"
127.0.0.1:6379> XADD s1 * name sidiot
"1665047646214-0"
# XDEL key id [id ...]
127.0.0.1:6379> XDEL s1 1665047646214-0
(integer) 1
# DEL key [key ...]
127.0.0.1:6379> DEL s1
(integer) 1
登入後複製

Redis特殊資料型態之stream

  • XLEN : 查詢訊息長度;

  • XREAD : 用於讀取訊息,可以按ID 讀取資料;

  • XRANGE : 讀取區間訊息;

  • XTRIM : 裁切佇列訊息個數;

# XLEN key
127.0.0.1:6379> XLEN s1
(integer) 2
# XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
127.0.0.1:6379> XREAD streams s1 0-0
1) 1) "s1"
   2) 1) 1) "1665047636078-0"
         2) 1) "name"
            2) "sid10t"
      2) 1) "1665047646214-0"
         2) 1) "name"
            2) "sidiot"
127.0.0.1:6379> XREAD count 1 streams s1 0-0
1) 1) "s1"
   2) 1) 1) "1665047636078-0"
         2) 1) "name"
            2) "sid10t"
    # XADD 了一条消息之后的扩展
    127.0.0.1:6379> XREAD streams s1 1665047636078-0
    1) 1) "s1"
       2) 1) 1) "1665047646214-0"
             2) 1) "name"
                2) "sidiot"
          2) 1) "1665053702766-0"
             2) 1) "age"
                2) "18"
# XRANGE key start end [COUNT count]
127.0.0.1:6379> XRANGE s1 - +
1) 1) "1665047636078-0"
   2) 1) "name"
      2) "sid10t"
2) 1) "1665047646214-0"
   2) 1) "name"
      2) "sidiot"
3) 1) "1665053702766-0"
   2) 1) "age"
      2) "18"
127.0.0.1:6379> XRANGE s1 1665047636078-0 1665047646214-0
1) 1) "1665047636078-0"
   2) 1) "name"
      2) "sid10t"
2) 1) "1665047646214-0"
   2) 1) "name"
      2) "sidiot"
# XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]
127.0.0.1:6379> XLEN s1
(integer) 3
127.0.0.1:6379> XTRIM s1 maxlen 2
(integer) 1
127.0.0.1:6379> XLEN s1
(integer) 2
登入後複製
  • XGROUP CREATE : 建立消費者群組;

  • XREADGROUP : 以消費群組形式讀取訊息;

  • XPENDING 和XACK :

XPENDING 指令可以用來查詢每個消費群組內所有消費者「已讀取、但尚未確認」的訊息;

XACK 指令用於向訊息佇列確認訊息處理已完成;

# XGROUP CREATE key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read]
# 需要注意的是,XGROUP CREATE 的 streams 必须是一个存在的 streams,否则会报错;
127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0
(error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
# 0-0 从头开始消费,$ 从尾开始消费;
127.0.0.1:6379> XADD myStream * name sid10t
"1665057823181-0"
127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0
OK
127.0.0.1:6379> XGROUP CREATE myStream cGroup-tail $
OK
# XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
127.0.0.1:6379> XREADGROUP Group cGroup-top name count 2 STREAMS myStream >
1) 1) "myStream"
   2) 1) 1) "1665058086931-0"
         2) 1) "name"
            2) "sid10t"
      2) 1) "1665058090167-0"
         2) 1) "name"
            2) "sidiot"
登入後複製

應用程式場景

訊息佇列

生產者透過XADD 指令插入一則訊息:

# * 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID
# 往名称为 mymq 的消息队列中插入一条消息,消息的键是 name,值是 sid10t
127.0.0.1:6379> XADD mymq * name sid10t
"1665058759764-0"
登入後複製

插入成功後會傳回全域唯一的ID:"1665058759764-0"。訊息的全域唯一ID 由兩部分組成:

  • 第一部分「1665058759764」 是資料插入時,以毫秒為單位計算的目前伺服器時間;

  • 第二部分錶示插入訊息在目前毫秒內的訊息序號,這是從0 開始編號的。例如,「1665058759764-0」 就表示在 “1665058759764” 毫秒內的第 1 條訊息。

消費者透過XREAD 指令從訊息佇列中讀取訊息時,可以指定一個訊息ID,並從這個訊息ID 的下一則訊息開始進行讀取(注意是輸入訊息ID 的下一則資訊開始讀取,不是查詢輸入ID 的訊息)。

127.0.0.1:6379> XREAD STREAMS mymq 1665058759764-0
(nil)
127.0.0.1:6379> XREAD STREAMS mymq 1665058759763-0
1) 1) "mymq"
   2) 1) 1) "1665058759764-0"
         2) 1) "name"
            2) "sid10t"
登入後複製

如果想要實作阻塞讀取(當沒有資料時,阻塞住),可以呼叫 XRAED 時設定 BLOCK 配置項,實現類似 BRPOP 的阻塞讀取操作。

例如,下面這命令,設定了BLOCK 10000 的配置項,10000 的單位是毫秒,表示XREAD 在讀取最新訊息時,如果沒有訊息到來,XREAD 將阻塞10000 毫秒(即10 秒) ,然後再返回。

# 命令最后的 $ 符号表示读取最新的消息
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS mymq $
(nil)
(10.01s)
登入後複製

Stream 的基礎方法,使用 xadd 存入訊息和 xread 循環阻塞讀取訊息的方式可以實現簡易版的訊息佇列,互動流程如下圖所示:

Redis特殊資料型態之stream

#前面介紹的這些操作List 也支援的,接下來再看看Stream 特有的功能。

Stream 可以使用 XGROUP 建立消費性群組,在建立消費群組之後,Stream 可以使用 XREADGROUP 指令讓消費性群組內的消費者讀取訊息。

建立兩個消費群組,這兩個消費群組消費的訊息佇列是mymq,都指定從第一則訊息開始讀取:

# 创建一个名为 group1 的消费组,0-0 表示从第一条消息开始读取。
127.0.0.1:6379> XGROUP CREATE mymq group1 0-0
OK
# 创建一个名为 group2 的消费组,0-0 表示从第一条消息开始读取。
127.0.0.1:6379> XGROUP CREATE mymq group2 0-0
OK
登入後複製

消費群組group1 內的消費者consumer1從mymq 訊息佇列中讀取所有訊息的命令如下:

# 命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665058759764-0"
         2) 1) "name"
            2) "sid10t"
登入後複製

訊息佇列中的訊息一旦被消費群組裡的一個消費者讀取了,就不能再被該消費群組內的其他消費者讀取了,即同一個消費組裡的消費者不能消費同一則訊息。

比如说,我们执行完刚才的 XREADGROUP 命令后,再执行一次同样的命令,此时读到的就是空值了:

127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
(nil)
登入後複製

但是,不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息) 。

比如说,刚才 group1 消费组里的 consumer1 消费者消费了一条 id 为 1665058759764-0 的消息,现在用 group2 消费组里的 consumer1 消费者消费消息:

127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665058759764-0"
         2) 1) "name"
            2) "sid10t"
登入後複製

因为我创建两组的消费组都是从第一条消息开始读取,所以可以看到第二组的消费者依然可以消费 id 为 1665058759764-0 的这一条消息。因此,不同的消费组的消费者可以消费同一条消息。

使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。

例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。

# 让 group2 中的 consumer1 从 mymq 消息队列中消费一条消息
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665060632864-0"
         2) 1) "name"
            2) "sid10t"
            
# 让 group2 中的 consumer2 从 mymq 消息队列中消费一条消息
127.0.0.1:6379> XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665060633903-0"
         2) 1) "name"
            2) "sid10t"
            
# 让 group2 中的 consumer3 从 mymq 消息队列中消费一条消息
127.0.0.1:6379> XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665060634962-0"
         2) 1) "name"
            2) "sid10t"
登入後複製

基于 Stream 实现的消息队列,如何保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息?

Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams “消息已经处理完成”。

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下图所示:

Redis特殊資料型態之stream

如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。

例如,我们来查看一下 group2 中各个消费者已读取、但尚未确认的消息个数,命令如下:

127.0.0.1:6379> XPENDING mymq group2
1) (integer) 4
2) "1665058759764-0"
3) "1665060634962-0"
4) 1) 1) "consumer1"
      2) "2"
   2) 1) "consumer2"
      2) "1"
   3) 1) "consumer3"
      2) "1"
登入後複製

如果想查看某个消费者具体读取了哪些数据,可以执行下面的命令:

# 查看 group2 里 consumer2 已从 mymq 消息队列中读取了哪些消息
127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2
1) 1) "1665060633903-0"
   2) "consumer2"
   3) (integer) 1888805
   4) (integer) 1
登入後複製

可以看到,consumer2 已读取的消息的 ID 是 1665060633903-0。

一旦消息 1665060633903-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。

127.0.0.1:6379> XACK mymq group2 1665060633903-0
(integer) 1
登入後複製

当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。

127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2
(empty array)
登入後複製

小结

好了,基于 Stream 实现的消息队列就说到这里了,小结一下:

  • 消息保序:XADD/XREAD

  • 阻塞读取:XREAD block

  • 重复消息处理:Stream 在使用 XADD 命令,会自动生成全局唯一 ID;

  • 消息可靠性:内部使用 PENDING List 自动保存消息,使用 XPENDING 命令查看消费组已经读取但是未被确认的消息,消费者使用 XACK 确认消息;

  • 支持消费组形式消费数据

Redis 基于 Stream 消息队列与专业的消息队列有哪些差距?

一个专业的消息队列,必须要做到两大块:

  • 消息不可丢。

  • 消息可堆积。

1、Redis Stream 消息会丢失吗?

使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。

Redis特殊資料型態之stream

Redis Stream 消息队列能不能保证三个环节都不丢失数据?

  • Redis 生产者会不会丢消息?生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。 从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到 ( MQ 中间件) 的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。

  • Redis 消费者会不会丢消息?不会,因为 Stream ( MQ 中间件)会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,但是未被确认的消息。消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。等到消费者执行完业务逻辑后,再发送消费确认 XACK 命令,也能保证消息的不丢失。

  • Redis 消息中间件会不会丢消息?会,Redis 在以下 2 个场景下,都会导致数据丢失:

AOF 持久化配置為每秒寫盤,但這個寫盤過程是異步的,Redis 宕機時會存在資料遺失的可能;

主從複製也是異步的,主從切換時,也存在遺失資料的可能(opens new window)。

可以看到,Redis 在佇列中間件環節無法保證訊息不會丟失。像RabbitMQ 或Kafka 這類專業的隊列中間件,在使用時是部署一個集群,生產者在發布訊息時,隊列中間件通常會寫“多個節點”,也就是有多個副本,這樣一來,即便其中一個節點掛了,也能保證叢集的資料不會遺失。

2、Redis Stream 訊息可堆積嗎?

Redis 的資料都儲存在記憶體中,這就意味著一旦發生訊息積壓,則會導致Redis 的記憶體持續成長,如果超過機器記憶體上限,就會面臨被OOM的風險。

所以 Redis 的 Stream 提供了可以指定佇列最大長度的功能,就是為了避免這種情況發生。

當指定佇列最大長度時,佇列長度超過上限後,舊訊息會被刪除,只保留固定長度的新訊息。這麼來看,Stream 在訊息積壓時,如果指定了最大長度,還是有可能遺失訊息的。

但 Kafka、RabbitMQ 專業的訊息佇列它們的資料都儲存在磁碟上,當訊息積壓時,無非就是多佔用一些磁碟空間。

因此,把Redis 當作佇列來使用時,會面臨的2 個問題:

  • Redis 本身可能會失去資料;

  • 面對訊息擠壓,記憶體資源會緊張;

所以,能不能將Redis 當作訊息佇列來使用,關鍵看你的業務場景:

  • 如果你的業務場景夠簡單,對於資料遺失不敏感,而且訊息積壓機率比較小的情況下,把Redis 當作佇列是完全可以的。

  • 如果你的業務有海量訊息,訊息積壓的機率比較大,並且不能接受資料遺失,那麼還是用專業的訊息佇列中間件吧。

補充:Redis 發布/訂閱機制為什麼不可以作為訊息佇列?

發布訂閱機制有以下缺點,都是跟遺失資料有關:

  • #發布/訂閱機制沒有基於任何資料型別實現,所以不具備「資料持久化」的能力,也就是發布/訂閱機制的相關操作,不會寫入到RDB 和AOF 中,當Redis 宕機重啟,發布/訂閱機制的資料也會全部遺失。

  • 發布訂閱模式是 「發後既忘」 的工作模式,如果有訂閱者離線重連之後不能消費之前的歷史訊息。

  • 當消費端有一定的訊息積壓時,也就是生產者發送的訊息,消費者消費不過來時,如果超過32M 或是60s 內持續保持在8M 以上,消費端會被強行斷開,這個參數是在設定檔中設定的,預設值是client-output-buffer-limit pubsub 32mb 8mb 60。

所以,發布/訂閱機制只適合即使通訊的場景,例如建構哨兵叢集 (opens new window)的場景採用了發布/訂閱機制。

推薦學習:Redis影片教學

以上是Redis特殊資料型態之stream的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:juejin.im
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板