This article brings you relevant knowledge about Redis, which mainly introduces the relevant content of a special data type stream. Redis provides a wealth of data types, and there are four special ones. Bitmap, hyperloglog, geospatial, stream, let's take a look at stream-related issues, I hope it will be helpful to everyone.
Recommended learning: Redis video tutorial
Redis Stream is a newly added data type in Redis 5.0 version. Redis is specially used for message queues. Designed data type.
Before Redis 5.0 Stream came out, the implementation of message queues had their own flaws, such as:
The publish-subscribe model was not durable and therefore unreliable. The defect of saving messages and being unable to read historical messages for clients who are offline and reconnecting;
List’s way of implementing message queue cannot be consumed repeatedly, and a message will be deleted after it is consumed. , and producers need to implement globally unique IDs themselves.
Based on the above problems, Redis 5.0 launched the Stream type, which is also the most important feature of this version. It is used to perfectly implement message queues. It supports message persistence and automatic generation of global messages. Unique ID, support for ack confirmation message mode, support for consumer group mode, etc. make the message queue more stable and reliable.
Stream message queue operation command:
XADD: Insert message, guaranteed In order, a globally unique ID can be automatically generated
XDEL: Delete messages based on message ID;
DEL: Delete the entire Stream;
# XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...] 127.0.0.1:6379> XADD s1 * name sid10t "1665047636078-0" 127.0.0.1:6379> XADD s1 * name sidiot "1665047646214-0" # XDEL key id [id ...] 127.0.0.1:6379> XDEL s1 1665047646214-0 (integer) 1 # DEL key [key ...] 127.0.0.1:6379> DEL s1 (integer) 1
XLEN: Query the message length;
XREAD: Used to read the message , you can read data by ID;
XRANGE: read interval messages;
XTRIM: trim the number of queue messages;
# XLEN key 127.0.0.1:6379> XLEN s1 (integer) 2 # XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] 127.0.0.1:6379> XREAD streams s1 0-0 1) 1) "s1" 2) 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" 127.0.0.1:6379> XREAD count 1 streams s1 0-0 1) 1) "s1" 2) 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" # XADD 了一条消息之后的扩展 127.0.0.1:6379> XREAD streams s1 1665047636078-0 1) 1) "s1" 2) 1) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" 2) 1) "1665053702766-0" 2) 1) "age" 2) "18" # XRANGE key start end [COUNT count] 127.0.0.1:6379> XRANGE s1 - + 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" 3) 1) "1665053702766-0" 2) 1) "age" 2) "18" 127.0.0.1:6379> XRANGE s1 1665047636078-0 1665047646214-0 1) 1) "1665047636078-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665047646214-0" 2) 1) "name" 2) "sidiot" # XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count] 127.0.0.1:6379> XLEN s1 (integer) 3 127.0.0.1:6379> XTRIM s1 maxlen 2 (integer) 1 127.0.0.1:6379> XLEN s1 (integer) 2
XGROUP CREATE: Create a consumer group;
XREADGROUP: Read messages in the form of a consumer group;
XPENDING and XACK:
The XPENDING command can be used to query the "read, but not yet confirmed" messages of all consumers in each consumer group;
The XACK command is used to confirm to the message queue that message processing has been completed;
# XGROUP CREATE key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read] # 需要注意的是,XGROUP CREATE 的 streams 必须是一个存在的 streams,否则会报错; 127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0 (error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically. # 0-0 从头开始消费,$ 从尾开始消费; 127.0.0.1:6379> XADD myStream * name sid10t "1665057823181-0" 127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0 OK 127.0.0.1:6379> XGROUP CREATE myStream cGroup-tail $ OK # XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...] 127.0.0.1:6379> XREADGROUP Group cGroup-top name count 2 STREAMS myStream > 1) 1) "myStream" 2) 1) 1) "1665058086931-0" 2) 1) "name" 2) "sid10t" 2) 1) "1665058090167-0" 2) 1) "name" 2) "sidiot"
Message queue
Producer Insert a message through the XADD command:
# * 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID # 往名称为 mymq 的消息队列中插入一条消息,消息的键是 name,值是 sid10t 127.0.0.1:6379> XADD mymq * name sid10t "1665058759764-0"
After successful insertion, the globally unique ID: "1665058759764-0" will be returned. The globally unique ID of the message consists of two parts:
The first part "1665058759764" is the current server time calculated in milliseconds when the data was inserted;
The second part indicates the message sequence number of the inserted message in the current millisecond, which is numbered starting from 0. For example, "1665058759764-0" means the 1st message within "1665058759764" milliseconds.
When consumers read messages from the message queue through the XREAD command, they can specify a message ID and start reading from the next message with this message ID (note that it is an input message Start reading the next information of ID, not the message of query input ID).
127.0.0.1:6379> XREAD STREAMS mymq 1665058759764-0 (nil) 127.0.0.1:6379> XREAD STREAMS mymq 1665058759763-0 1) 1) "mymq" 2) 1) 1) "1665058759764-0" 2) 1) "name" 2) "sid10t"
If you want to implement blocking reading (blocking when there is no data), you can set the BLOCK configuration item when calling XRAED to implement a blocking reading operation similar to BRPOP.
For example, the following command sets the configuration item of BLOCK 10000. The unit of 10000 is milliseconds, which indicates that when XREAD reads the latest message, if no message arrives, XREAD will block for 10000 milliseconds (i.e. 10 seconds) , and then return.
# 命令最后的 $ 符号表示读取最新的消息 127.0.0.1:6379> XREAD BLOCK 10000 STREAMS mymq $ (nil) (10.01s)
The basic method of Stream uses xadd to store messages and xread to block reading messages in a loop to implement a simple version of the message queue. The interaction process is as shown in the figure below:
These operations List introduced earlier are also supported. Next, let’s take a look at the unique functions of Stream.
Stream can use XGROUP to create a consumer group. After creating the consumer group, Stream can use the XREADGROUP command to let consumers in the consumer group read messages.
Create two consumer groups. The message queue consumed by these two consumer groups is mymq. They both specify to start reading from the first message:
# 创建一个名为 group1 的消费组,0-0 表示从第一条消息开始读取。 127.0.0.1:6379> XGROUP CREATE mymq group1 0-0 OK # 创建一个名为 group2 的消费组,0-0 表示从第一条消息开始读取。 127.0.0.1:6379> XGROUP CREATE mymq group2 0-0 OK
Consumer consumer1 in consumer group group1 The command to read all messages from the mymq message queue is as follows:
# 命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。 127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665058759764-0" 2) 1) "name" 2) "sid10t"
Once a message in the message queue is read by a consumer in the consumer group, it can no longer be read by other consumers in the consumer group. Read, that is, consumers in the same consumer group cannot consume the same message.
比如说,我们执行完刚才的 XREADGROUP 命令后,再执行一次同样的命令,此时读到的就是空值了:
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS mymq > (nil)
但是,不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息) 。
比如说,刚才 group1 消费组里的 consumer1 消费者消费了一条 id 为 1665058759764-0 的消息,现在用 group2 消费组里的 consumer1 消费者消费消息:
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665058759764-0" 2) 1) "name" 2) "sid10t"
因为我创建两组的消费组都是从第一条消息开始读取,所以可以看到第二组的消费者依然可以消费 id 为 1665058759764-0 的这一条消息。因此,不同的消费组的消费者可以消费同一条消息。
使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。
例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。
# 让 group2 中的 consumer1 从 mymq 消息队列中消费一条消息 127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665060632864-0" 2) 1) "name" 2) "sid10t" # 让 group2 中的 consumer2 从 mymq 消息队列中消费一条消息 127.0.0.1:6379> XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665060633903-0" 2) 1) "name" 2) "sid10t" # 让 group2 中的 consumer3 从 mymq 消息队列中消费一条消息 127.0.0.1:6379> XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1665060634962-0" 2) 1) "name" 2) "sid10t"
基于 Stream 实现的消息队列,如何保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息?
Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams “消息已经处理完成”。
消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下图所示:
如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。
例如,我们来查看一下 group2 中各个消费者已读取、但尚未确认的消息个数,命令如下:
127.0.0.1:6379> XPENDING mymq group2 1) (integer) 4 2) "1665058759764-0" 3) "1665060634962-0" 4) 1) 1) "consumer1" 2) "2" 2) 1) "consumer2" 2) "1" 3) 1) "consumer3" 2) "1"
如果想查看某个消费者具体读取了哪些数据,可以执行下面的命令:
# 查看 group2 里 consumer2 已从 mymq 消息队列中读取了哪些消息 127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2 1) 1) "1665060633903-0" 2) "consumer2" 3) (integer) 1888805 4) (integer) 1
可以看到,consumer2 已读取的消息的 ID 是 1665060633903-0。
一旦消息 1665060633903-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。
127.0.0.1:6379> XACK mymq group2 1665060633903-0 (integer) 1
当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。
127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2 (empty array)
好了,基于 Stream 实现的消息队列就说到这里了,小结一下:
消息保序:XADD/XREAD
阻塞读取:XREAD block
重复消息处理:Stream 在使用 XADD 命令,会自动生成全局唯一 ID;
消息可靠性:内部使用 PENDING List 自动保存消息,使用 XPENDING 命令查看消费组已经读取但是未被确认的消息,消费者使用 XACK 确认消息;
支持消费组形式消费数据
Redis 基于 Stream 消息队列与专业的消息队列有哪些差距?
一个专业的消息队列,必须要做到两大块:
消息不可丢。
消息可堆积。
1、Redis Stream 消息会丢失吗?
使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。
Redis Stream 消息队列能不能保证三个环节都不丢失数据?
Redis 生产者会不会丢消息?生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。 从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到 ( MQ 中间件) 的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。
Redis 消费者会不会丢消息?不会,因为 Stream ( MQ 中间件)会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,但是未被确认的消息。消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。等到消费者执行完业务逻辑后,再发送消费确认 XACK 命令,也能保证消息的不丢失。
Redis 消息中间件会不会丢消息?会,Redis 在以下 2 个场景下,都会导致数据丢失:
AOF persistence is configured to write disks every second, but this disk writing process is asynchronous, and there is a possibility of data loss when Redis goes down;
Master-slave replication is also asynchronous Yes, there is also the possibility of data loss when switching between master and slave (opens new window).
As you can see, Redis cannot guarantee that messages will not be lost in the queue middleware link. Professional queue middleware such as RabbitMQ or Kafka is used to deploy a cluster. When a producer publishes a message, the queue middleware usually writes "multiple nodes", that is, there are multiple copies. In this way, Even if one of the nodes fails, the data in the cluster can be guaranteed not to be lost.
2. Can Redis Stream messages be accumulated?
Redis data is stored in memory, which means that once a message backlog occurs, Redis memory will continue to grow. If it exceeds the machine memory limit, it will face OOM. risks of.
So Redis's Stream provides the function of specifying the maximum length of the queue, just to avoid this situation.
When the maximum length of the queue is specified, after the queue length exceeds the upper limit, old messages will be deleted and only new messages of a fixed length will be retained. From this point of view, if Stream has a maximum length specified when messages are backlogged, messages may still be lost.
But the data of professional message queues such as Kafka and RabbitMQ are stored on disk. When messages are backlogged, they just take up more disk space.
Therefore, when using Redis as a queue, you will face two problems:
Redis itself may lose data;
In the face of message squeeze, memory resources will be tight;
Therefore, whether you can use Redis as a message queue depends on your business scenario:
If your business scenario is simple enough, not sensitive to data loss, and the probability of message backlog is relatively small, it is completely okay to use Redis as a queue.
If your business has a large amount of messages, the probability of message backlog is relatively high, and data loss cannot be accepted, then use professional message queue middleware.
Supplement: Why can’t the Redis publish/subscribe mechanism be used as a message queue?
The publish and subscribe mechanism has the following shortcomings, all related to lost data:
The publish/subscribe mechanism is not implemented based on any data type, so it does not have "data persistence" ” capabilities, that is, related operations of the publish/subscribe mechanism, will not be written to RDB and AOF. When Redis crashes and restarts, all data of the publish/subscribe mechanism will be lost.
The publish-subscribe mode is a "send and forget" working mode. If a subscriber goes offline and reconnects, he cannot consume previous historical messages.
When there is a certain backlog of messages on the consumer side, that is, the messages sent by the producer, and the consumer cannot consume them, if it exceeds 32M or remains above 8M within 60s, The consumer end will be forcibly disconnected. This parameter is set in the configuration file. The default value is client-output-buffer-limit pubsub 32mb 8mb 60.
Therefore, the publish/subscribe mechanism is only suitable for instant communication scenarios. For example, the scenario of building a sentinel cluster (opens new window) uses the publish/subscribe mechanism.
Recommended learning: Redis video tutorial
The above is the detailed content of Redis special data type stream. For more information, please follow other related articles on the PHP Chinese website!