How does Redis implement message queue and delayed message queue? The following article will introduce to you the implementation methods of message queue and delayed message queue in Redis. I hope it will be helpful to you!
When it comes to redis, more people may think of it as a cache. In fact, redis can also implement some simple message queue purposes. We can use the list data structure to implement the queue. . [Related recommendations: Redis Video Tutorial] Several commands of
lpush (left push)
by queue Store it in from the left side
rpush (right push)
Store it from the right side of the queue
lpop (left pop)
Take it out from the left side of the queue
rpop (right pop)
Take it out from the right side of the queue
The above four commands can let list help us implement queues or stacks. The characteristics of queues are advanced First out, the characteristic of the stack is first in, last out,
So the queue implementation can use lpush rpop or rpush lpop,
The stack implementation is lpush lpop or rpush rpop.
Producer publishes message
First we use rpush to add five elements to a queue called notify-queue, namely 1 2 3 4 5, which is to publish messages as a producer
Consumer consumption news
Since the producer uses rpush, the consumer must use lpop. You can see the picture below. We keep informing -queue consumes messages in order, from 1 to 5, and reads them in order. In the end, there are no messages in the queue, and the pop-up is always empty
When using lpop to consume messages above, you can see that after the message is consumed, every time we go to pop, we read an empty message,
The above is a manual execution command, but if the written code program keeps popping data (pulling data), it will cause empty polling (useless reading),
will both pull high It increases the CPU consumption of the client, increases the QPS of redis, and is still a useless operation. These useless operations may cause other clients' access to redis to become slow to respond.
Solution A (hibernation)
Since empty polling will cause higher resource consumption on both the client and redis, then We can let the client sleep for 1s when receiving empty data, and then pull the data after 1s, which can reduce consumption
Thread.sleep(1000)
This solution also has flaws, that is, the delay in message consumption increases. If there is only one consumer, the delay is 1s. That is, after empty polling, it happens to be sleeping, but at this time, a message happens to come. You still have to wait until 1s to wake up before consumption.
If there are multiple consumers, since the sleep time of each consumer is staggered, some latency will be reduced, but is there a better way? Method that can achieve almost 0 latency?
Solution B (Blocking Read)
There are actually two commands in redis about queue data fetching, namely blocking reading,
blpop (blocking left pop)
brpop (blocking right pop)
Blocking read will enter a dormant state when there is no data in the queue. Once a message comes, Then react immediately and read the data, so using blpop/brpop to replace lpop/rpop can solve the problem of message delay.
Continue to queue 3 attributes, 6, 7, 8
Use blpop to read the queue. The last parameter is the waiting time for blocking reading. If there is no message after this time, nil will be returned. At this time, you can continue to repeat the blpop operation.
The problem of automatic disconnection of idle connections for blocking reads
When the client uses blocking reading, if the blocking time is too long, The service will generally treat it as an idle connection and actively disconnect it to reduce useless connections occupying resources. At this time, the client will throw an exception,
So please note that when the client uses blocking reading, It is necessary to catch exceptions and handle them accordingly, such as retrying.
java client implements message queue
The idea is the same as above, except that the command line client redis-cli is changed into java language. One thread or multiple threads publish rpush,
Another thread or threads perform blpop consumption. The completed code is at: https://github.com/qiaomengnan16/redis-demo/tree/main/redis-queue
Publisher
Subscriber
When the consumer then retrieves the message, it intercepts the zset data and obtains the message that has satisfied the current time (that is, the data with a score less than or equal to the current timestamp is obtained. The score less than or equal to the current timestamp means that the message has reached the time. If it is larger, it means you have to wait for a while before consumption). Key commands zadd (publisher), zrangebyscore (subscriber), zrem (subscriber deletes after consuming data)
Command implementation
We used zadd to add 4 pieces of data, which are data that can be consumed after 1, 2, and 3 seconds (pseudo-speak, this is actually just a score), and there is also kafka that can be consumed after 10 seconds. If it is now the third second, we take the data in zset that is greater than or equal to 1 second and less than or equal to 3 seconds, because the data in this interval is exactly what we can consume Yes, you can see that we have taken out 3 pieces of data that meet the conditions. If you can only consume one piece of data at a time, you can add a limit restriction. You can see The following figure takes out the first data that can be consumed. redis # Also note that it is different from lpop/ and blpop of list (they will automatically delete the data in the original queue when they pop up. data), Although the data is obtained, if you do not use zrem to delete it, this data will still be read by others, because it still exists in zset, But zrem It may happen that it has been deleted (consumed) by others first, so the code also needs to judge whether the return value of zrem is greater than 0 to determine whether we have successfully preempted this message, and then consume it correctly after success.Code implementation
Publisher
Subscriber
##Test the delay effect
Full code address: https://github.com/qiaomengnan16/redis-demo/tree/main/redis-delayed-queue
##Optimization, using lua to implementThere is a problem in the delay queue implemented above. When using zrem to determine whether to grab the data, it is very likely that it has not been grabbed. If you continue to read like this, you may not be able to grab it for several rounds, and resources are wasted. Therefore, optimization can be carried out through Lua scripts,
Let zrangebyscore and zrem become an atomic operation, which can avoid multi-thread contention and waste of resources that cannot be grabbed.Conclusion
Introduction to Programming
! !The above is the detailed content of A brief discussion on the implementation methods of message queue and delayed message queue in Redis. For more information, please follow other related articles on the PHP Chinese website!