This article will take you through the advanced usage of Redis - message queue, and introduce the delay queue in Redis. I hope it will be helpful to everyone!
When it comes to message queue middleware, we all think of RabbitMQ, RocketMQ and Kafka to implement asynchronous messaging functions for applications. These are specialized message queue middlewares with more features than we can comprehend.
The use of these message middleware is complicated, such as RabbitMQ. Before sending a message, you need to create an Exchange and a Queue, and then bind the Exchange and Queue through some rules. When sending a message You also need to formulate routing-key and control header messages. This is only for producers. Consumers also have to go through the above series of cumbersome steps again before consuming messages.
So for those who do not require 100% reliability and want to implement simple message queue requirements, we can use Redis to free us from the cumbersome steps of message queue middleware.
Redis's message queue is not a professional message queue. It does not have many advanced features in the message queue, nor does it have an ack guarantee. If you have the ultimate pursuit of message reliability, please turn to professional MQ middleware. [Related recommendations: Redis Video Tutorial]
Starting from the simplest asynchronous message queue, the Redis list data structure is commonly used As an asynchronous message queue, lrpush/lpush is used to enqueue, and rpop/lpop is used to dequeue.
For pop operation, when the message queue is empty, the client will fall into The infinite loop of pop causes a lot of life-wasting idle polling, causing the client CPU to increase, and at the same time, Redis's QPS is also increased.
The solution to the above problem is to use blpop/brpop of the list structure to dequeue, where the b prefix represents blocking, blocking reading. For blocking reads, it will enter the sleep state when there is no data in the queue, and will wake up as soon as the data arrives. Perfectly solves the above problem.
The blocking read solution seems perfect, but then it leads to another problem: idle connection. If the thread keeps blocking somewhere, the Redis client connection becomes an idle connection. If the idle time is too long, the Redis server will actively disconnect to reduce the occupation of idle resources. At this time, blpop/brpop will throw an exception.
So, we need to be careful when writing client (application) consumers, pay attention to catching exceptions, and retry.
In Redis distributed locks, there are generally three strategies to deal with locking failures:
Throw an exception directly, and the front end will remind the user whether to continue the operation;
sleep for a while and try again;
will Put the request in the delay queue and try again after a while;
As for the delay queue in Redis, we can implement it through the zset (ordered list) data structure. We serialize the message as a string as the value of zse, and the expiration processing time (delay time) of the message as the score. Then poll zset to obtain the expiration time for processing, and remove the key from zset through zrem to represent successful consumption, and then process the task.
The core code is as follows:
// 生产\ public void delay(T msg) {\ TaskItem task = new TaskItem();\ task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid\ task.msg = msg;\ String s = JSON.toJSONString(task); // fastjson 序列化\ jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试\ }\ // 消费\ public void loop() {\ while (!Thread.interrupted()) {\ // zrangeByScore参数中0, System.currentTimeMills()代表从redis中去score范围在0到系统当前时间的数据, 0,1表示从0开始取1个 拓展传入的score为-inf, +inf 分别表示zset中的最大值和最小值,当你不知道zset中的score最值时就可以使用inf作为参数变量\ Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);\ if (values.isEmpty()) {\ try {\ Thread.sleep(500); // 歇会继续\ }\ catch (InterruptedException e) {\ break;\ }\ continue;\ }\ String s = values.iterator().next(); //消费队列\ if (jedis.zrem(queueKey, s) > 0) { // 抢到了,要考虑到多线程下锁争抢的情况,只有rem成功代表成功的消费了一条消息。\ TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化\ this.handleMsg(task.msg);\ }\ }\ }
The above code is used in multi-threading for the situation where the same task is competed by multiple threads. Although it can avoid a task being processed by zrem after processing the task. Multiple consumption situations. But for those threads that obtained the task but failed to consume it successfully, it was a waste of time to obtain the task. So you can consider optimizing this logic through lua scripting. Moving zrangeByScore and zrem together to the server for atomic operations will solve the problem perfectly.
For more programming-related knowledge, please visit: Introduction to Programming! !
The above is the detailed content of A brief analysis of how to use message queues in Redis. For more information, please follow other related articles on the PHP Chinese website!