自從在 Redis 4.0 引入模組後,使用者開始思考如何解決這些問題。其中一個用戶 Timothy Downs 透過 IRC 和我說:
\<forkfork> 我计划给这个模块增加一个事务日志式的数据类型 —— 这意味着大量的订阅者可以在不导致 redis 内存激增的情况下做一些像发布/订阅那样的事情\<forkfork> 订阅者持有他们在消息队列中的位置,而不是让 Redis 必须维护每个消费者的位置和为每个订阅者复制消息
他的想法啟發了我。經過幾天的思考,我明白這或許是我們一舉解決所有問題的機會。我需要去重新構思 「日誌」 的概念是什麼。日誌是個基本的程式設計元素,每個人都使用過它,因為它只是簡單地以追加模式開啟一個文件,並以一定的格式寫入資料。然而 Redis 資料結構必須是抽象的。它們在記憶體中,我們使用記憶體並不是因為我們懶,而是因為使用一些指針,我們可以概念化資料結構並把它們抽象,以使它們擺脫明確的限制。例如,一般來說日誌有幾個問題:偏移不是邏輯化的,而是真實的位元組偏移,如果你想要與條目插入的時間相關的邏輯偏移該怎麼辦?我們有範圍查詢可用。同樣,日誌通常很難進行垃圾回收:在一個只能進行追加操作的資料結構中怎麼去刪除舊的元素?好吧,在我們理想的日誌中,我們只需要說,我想要數字***的那個條目,而舊的元素一個也不要,等等。
當我從 Timothy 的想法中受到啟發,去嘗試著寫一個規範的時候,我使用了 Redis 集群中的 radix 樹去實現,優化了它內部的某些部分。這為實現一個有效利用空間的日誌提供了基礎,而且仍然有可能在對數時間內存取範圍。同時,我開始去讀關於Kafka 的流相關的內容以獲得另外的靈感,它也非常適合我的設計,***借鑒了Kafka消費組的概念,並且再次針對 Redis 進行最佳化,以適用於Redis 在記憶體中使用的情況。然而,該規範僅停留在紙面上,在一段時間後我幾乎把它從頭到尾重寫了一遍,以便將我與別人討論的所得到的許多建議一起增加到 Redis 升級中。我希望 Redis 流能成為對於時間序列有用的特性,而不僅是一個常見的事件和訊息類別的應用程式。
從 Redis 大會回來後,整個夏天我都在實作一個叫 listpack 的函式庫。這個函式庫是 ziplist.c
的繼任者,那是一個表示在單一指派中的字串元素清單的資料結構。它是一個非常特殊的序列化格式,其特點在於也能夠以逆序(從右到左)解析:以便在各種用例中取代 ziplists。
結合 radix 樹和 listpacks 的特性,它可以很容易地去建立一個空間高效的日誌,並且還是可索引的,這意味著允許透過 ID 和時間進行隨機存取。自從這些就緒後,我開始去寫一些程式碼來實現流資料結構。我還在完成這個實現,不管怎樣,現在在 Github 上的 Redis 的 streams 分支裡它已經可以跑起來了。我並沒有聲稱那個API 是100% 的最終版本,但是,這有兩個有趣的事實:一,在那時只有消費群組是缺失的,加上一些不太重要的操作流的命令,但是,所有的大的面向都已經實現了。二,一旦各方面都比較穩定了之後,我決定大概用兩個月的時間將所有的流的特性向後移植到 4.0 分支。這意味著 Redis 用戶想要使用串流,不用等待 Redis 4.2 發布,它們在生產環境馬上就可用了。這是可能的,因為作為一個新的資料結構,幾乎所有的程式碼改變都出現在新的程式碼裡面。除了阻塞列表操作之外:程式碼被重構了,我們對於流和列表阻塞操作共享了相同的程式碼,而大大簡化了 Redis 內部實作。
在某些方面,你可以認為串流是 Redis 清單的增強版本。流元素不再是單一的字串,而是由欄位和值所組成的物件。範圍查詢更適用且更快。在流中,每個條目都有一個 ID,它是一個邏輯偏移量。不同的客戶端可以阻塞等待比指定的 ID 更大的元素。 Redis 流的一個基本的命令是 XADD
。是的,所有的 Redis 流指令都是以一個 X
為前綴的。
> XADD mystream * sensor-id 1234 temperature 10.51506871964177.0
这个 XADD
命令将追加指定的条目作为一个指定的流 —— “mystream” 的新元素。上面示例中的这个条目有两个字段:sensor-id
和 temperature
,每个条目在同一个流中可以有不同的字段。使用相同的字段名可以更好地利用内存。有意思的是,字段的排序是可以保证顺序的。XADD
仅返回插入的条目的 ID,因为在第三个参数中是星号(*
),表示由命令自动生成 ID。通常这样做就够了,但是也可以去强制指定一个 ID,这种情况用于复制这个命令到从服务器和AOF文件。
这个 ID 是由两部分组成的:一个毫秒时间和一个序列号。1506871964177
是毫秒时间,它只是一个毫秒级的 UNIX 时间戳。圆点(.
)后面的数字 0
是一个序号,它是为了区分相同毫秒数的条目增加上去的。这两个数字都是 64 位的无符号整数。这意味着,我们可以在流中增加所有想要的条目,即使是在同一毫秒中。ID 的毫秒部分使用 Redis 服务器的当前本地时间生成的 ID 和流中的***一个条目 ID 两者间的***的一个。因此,举例来说,即使是计算机时间回跳,这个 ID 仍然是增加的。在某些情况下,你可以认为流条目的 ID 是完整的 128 位数字。然而,事实上它们与被添加到的实例的本地时间有关,这意味着我们可以在毫秒级的精度的范围随意查询。
正如你想的那样,快速添加两个条目后,结果是仅一个序号递增了。我们可以用一个 MULTI
/EXEC
块来简单模拟“快速插入”:
> MULTIOK> XADD mystream * foo 10QUEUED> XADD mystream * bar 20QUEUED> EXEC1) 1506872463535.02) 1506872463535.1
在上面的示例中,也展示了无需指定任何初始模式的情况下,对不同的条目使用不同的字段。会发生什么呢?就像前面提到的一样,只有每个块(它通常包含 50-150 个消息内容)的***个消息被使用。并且,相同字段的连续条目都使用了一个标志进行了压缩,这个标志表示与“它们与这个块中的***个条目的字段相同”。因此,使用相同字段的连续消息可以节省许多内存,即使是字段集随着时间发生缓慢变化的情况下也很节省内存。
为了从流中检索数据,这里有两种方法:范围查询,它是通过 XRANGE
命令实现的;流播,它是通过 XREAD
命令实现的。XRANGE
命令仅取得包括从开始到停止范围内的全部条目。因此,举例来说,如果我知道它的 ID,我可以使用如下的命名取得单个条目:
> XRANGE mystream 1506871964177.0 1506871964177.01) 1) 1506871964177.0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "10.5"
不管怎样,你都可以使用指定的开始符号 -
和停止符号 +
表示最小和***的 ID。为了限制返回条目的数量,也可以使用 COUNT
选项。下面是一个更复杂的 XRANGE
示例:
> XRANGE mystream - + COUNT 21) 1) 1506871964177.0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "10.5"2) 1) 1506872463535.0 2) 1) "foo" 2) "10"
这里我们讲的是 ID 的范围,然后,为了取得在一个给定时间范围内的特定范围的元素,你可以使用 XRANGE
,因为 ID 的“序号” 部分可以省略。因此,你可以只指定“毫秒”时间即可,下面的命令的意思是:“从 UNIX 时间 1506872463 开始给我 10 个条目”:
127.0.0.1:6379> XRANGE mystream 1506872463000 + COUNT 101) 1) 1506872463535.0 2) 1) "foo" 2) "10"2) 1) 1506872463535.1 2) 1) "bar" 2) "20"
关于 XRANGE
需要注意的最重要的事情是,假设我们在回复中收到 ID,随后连续的 ID 只是增加了序号部分,所以可以使用 XRANGE
遍历整个流,接收每个调用的指定个数的元素。Redis 中的*SCAN
系列命令允许迭代 Redis 数据结构,尽管事实上它们不是为迭代设计的,但这样可以避免再犯相同的错误。
当我们想通过 ID 或时间去访问流中的一个范围或者是通过 ID 去获取单个元素时,使用 XRANGE
是非常***的。然而,在使用流的案例中,当数据到达时,它必须由不同的客户端来消费时,这就不是一个很好的解决方案,这需要某种形式的汇聚池。(对于 某些 应用程序来说,这可能是个好主意,因为它们仅是偶尔连接查询的)
XREAD
命令是为读取设计的,在同一个时间,从多个流中仅指定我们从该流中得到的***条目的 ID。此外,如果没有数据可用,我们可以要求阻塞,当数据到达时,就解除阻塞。类似于阻塞列表操作产生的效果,但是这里并没有消费从流中得到的数据,并且多个客户端可以同时访问同一份数据。
这里有一个典型的 XREAD
调用示例:
> XREAD BLOCK 5000 STREAMS mystream otherstream $ $
它的意思是:从 mystream
和 otherstream
取得数据。如果没有数据可用,阻塞客户端 5000 毫秒。在 STREAMS
选项之后指定我们想要监听的关键字,***的是指定想要监听的 ID,指定的 ID 为 $
的意思是:假设我现在需要流中的所有元素,因此,只需要从下一个到达的元素开始给我。
如果我从另一个客户端发送这样的命令:
> XADD otherstream * message “Hi There”
在 XREAD
侧会出现什么情况呢?
1) 1) "otherstream" 2) 1) 1) 1506935385635.0 2) 1) "message" 2) "Hi There"
与收到的数据一起,我们也得到了数据的关键字。在下次调用中,我们将使用接收到的***消息的 ID:
> XREAD BLOCK 5000 STREAMS mystream otherstream $ 1506935385635.0
依次类推。然而需要注意的是使用方式,客户端有可能在一个非常大的延迟之后再次连接(因为它处理消息需要时间,或者其它什么原因)。在这种情况下,期间会有很多消息堆积,为了确保客户端不被消息淹没,以及服务器不会因为给单个客户端提供大量消息而浪费太多的时间,使用 XREAD
的 COUNT
选项是非常明智的。
目前看起来还不错……然而,有些时候,流需要删除一些旧的消息。幸运的是,这可以使用 XADD
命令的 MAXLEN
选项去做:
> XADD mystream MAXLEN 1000000 * field1 value1 field2 value2
它是基本意思是,如果在流中添加新元素后发现消息数量超过了 1000000
个,那么就删除旧的消息,以便于元素总量重新回到 1000000
以内。它很像是在列表中使用的 RPUSH
+ LTRIM
,但是,这次我们是使用了一个内置机制去完成的。然而,需要注意的是,上面的意思是每次我们增加一个新的消息时,我们还需要另外的工作去从流中删除旧的消息。这将消耗一些 CPU 资源,所以在计算 MAXLEN
之前,尽可能使用 ~
符号,以表明我们不要求非常 精确 的 1000000 个消息,就是稍微多一些也不是大问题:
> XADD mystream MAXLEN ~ 1000000 * foo bar
这种方式的 XADD 仅当它可以删除整个节点的时候才会删除消息。相比普通的 XADD
,这种方式几乎可以自由地对流进行封顶。
这是***个 Redis 中尚未实现而在开发中的特性。灵感也是来自 Kafka,尽管在这里是以不同的方式实现的。重点是使用了 XREAD
,客户端也可以增加一个 GROUP <name>
选项。相同组的所有客户端将自动得到 不同的 消息。当然,同一个流可以被多个组读取。在这种情况下,所有的组将收到流中到达的消息的相同副本。但是,在每个组内,消息是不会重复的。
当指定组时,能够指定一个 RETRY <milliseconds>
选项去扩展组:在这种情况下,如果消息没有通过 XACK
进行确认,它将在指定的毫秒数后进行再次投递。这将为消息投递提供更佳的可靠性,这种情况下,客户端没有私有的方法将消息标记为已处理。这一部分也正在开发中。
因为用来建模 Redis 流的设计,内存使用率是非常低的。这取决于它们的字段、值的数量和长度,对于简单的消息,每使用 100MB 内存可以有几百万条消息。此外,该格式设想为需要极少的序列化:listpack 块以 radix 树节点方式存储,在磁盘上和内存中都以相同方式表示的,因此它们可以很轻松地存储和读取。例如,Redis 可以在 0.3 秒内从 RDB 文件中读取 500 万个条目。这使流的复制和持久存储非常高效。
我还计划允许从条目中间进行部分删除。现在仅实现了一部分,策略是在条目在标记中标识条目为已删除,并且,当已删除条目占全部条目的比例达到指定值时,这个块将被回收重写,如果需要,它将被连到相邻的另一个块上,以避免碎片化。
以上是如何使用Redis的streams的詳細內容。更多資訊請關注PHP中文網其他相關文章!