浅谈Redis中消息队列和延时消息队列的实现方法
Redis如何实现消息队列与延时消息队列?下面本篇文章给大家介绍一下Redis中消息队列和延时消息队列的实现方法,希望对大家有所帮助!
提到redis,更多的可能想到用作缓存的用途,其实redis也可以实现一些简单的消息队列用途,我们可以使用 list 数据结构实现队列。【相关推荐:Redis视频教程】
list的几个命令
lpush (left push)
由队列的左边存放进去
rpush (right push)
由队列的右边存放进去
lpop (left pop)
由队列的左边取出来
rpop (right pop)
由队列的右边取出来
以上的四个命令,可以让 list 帮我们实现队列 或者 栈,队列的特性是先进先出,栈的特性是先进后出,
所以队列的实现可以使用 lpush + rpop 或者 rpush + lpop,
栈的实现则是lpush + lpop 或者 rpush + rpop。
使用命令演示队列
生产者发布消息
首先我们使用 rpush 对一个叫做notify-queue的队列,增加五个元素,即 1 2 3 4 5,也就是作为生产者发布消息啦
消费者消费消息
既然生产者使用的是rpush,那么消费者就要用lpop,可以看下下图,我们不停对notify-queue进行消息消费,并且是按照顺序的,从1一直到5,按顺序读出,最终队列中没有消息了,弹出的则一直是空
空轮询问题
在上面使用lpop消费消息时,可以看到,消息消费完后,我们每次再去pop时,读到的都是一个空的信息,
上面是手动执行命令,但是如果是写好的代码程序不停的去pop数据(拉取数据)的话,会造成空轮询(无用的读取),
既拉高了客户端的CPU消耗,又拉高了redis的QPS,并且还是无用操作,这些无用操作可能会造成其他客户端对redis的访问变得响应缓慢。
解决方案A (休眠)
既然空轮询会让客户端和redis的资源消耗都会变得较高,那么我们可以让客户端在收到空数据的时候,进行1s的休眠,1s后再进行数据拉取,这样可以降低消耗
Thread.sleep(1000)
这个方案也是存在瑕疵的,即消息消费延迟性增大了,如果只有一个消费者的话,延迟就是1s,即空轮询后,正好休眠了,但是这时候刚好有消息过来了,还是要等到1s醒来后才能消费,
如果有多个消费者的话,由于每个消费者的睡眠时间是岔开的,会降低一些延迟性,但是有没有办法更好的方法,可以做到几乎 0 延迟?
解决方案B (阻塞读)
redis中关于队列取数据其实还有两个命令,即阻塞读取,
blpop (blocking left pop)
brpop (blocking right pop)
阻塞读在队列没有数据的时候,会进入休眠状态,一旦有消息来了以后,则立刻做出反应,读取数据,因此使用 blpop/brpop 替换 lpop/rpop 则可以解决消息延迟性的问题,
继续入队3个属性,6、7、8
使用 blpop 进行队列的读取,最后一个参数是阻塞读的等待时间,如果超过这个时间还没有消息,将会返回nil,此时可以继续重复blpop操作,
阻塞读的空闲连接自动断开问题
客户端使用阻塞读时,如果阻塞的时间过长,服务一般会当成空闲连接,从而对其进行主动断开,减少无用的连接占用资源,这个时候客户端会抛出异常,
所以请注意,在客户端使用阻塞读的时候,要进行异常的捕获,从而做出相应的处理,例如重试。
java客户端实现消息队列
思路和上述一样,只是由命令行客户端redis-cli变成了java语言,一个线程或多个线程进行 rpush 的发布,
另外一个或多个线程进行 blpop 消费,完成的代码在:https://github.com/qiaomengnan16/redis-demo/tree/main/redis-queue
发布者
订阅者
延时队列的实现思路
延时队列指的是,消息发送的一段时间后,再由消费者进行消费,而不是发送过去后,消费者就能立即读取到,
zset的可以帮我们做到这个事情,首先zset可以通过score进行排序,score可以存一个时间戳,所以我们每次发布消息的时候,用当前时间戳加上延时的时间戳,
随后消费者取消息的时候,通过截取zset的数据,取到已经满足当前时间的消息(即取score小于等于当前时间戳的数据,score小于等于当前时间戳代表消息已经到时间了,如果大于的话,说明还得等一会才能消费)。
关键命令 zadd (发布者),zrangebyscore(订阅者),zrem (订阅者消费完数据后删除)
命令实现
我们使用zadd添加了4个数据,分别是1、2、3秒(伪说法,这里其实只是个score)后才能消费的数据,还有一个10秒后才能消费的kafka,
假如现在已经到了第三秒,我们取zset中大于等于1秒的和小于等于3秒的数据,因为这个区间的数据正好是我们可以消费的,可以看到,我们取出了符合条件的3条数据,
如果每次只能消费一个数据的话,可以加一个limit限制条件,可以看下图取出了第一个可以消费的数据,redis
同时注意,和list的lpop/和blpop不同(它们弹出即自动删除原始队列里的该数据),
虽然获取到了数据,但是如果不使用zrem进行删除的话,这条数据还会被其他人读到,因为他还一直存在zset中,
不过zrem可能会发生已经被别人抢先一步删除(消费)的情况,所以代码中还需要根据zrem的返回值是否大于0判断,本次消息我们是否抢占成功,成功后再进行正确消费。
代码实现
发布者
订阅者
测试延迟效果
完整代码地址:https://github.com/qiaomengnan16/redis-demo/tree/main/redis-delayed-queue
优化, 使用lua实现
上面实现的延迟队列中,有一个问题,就是使用zrem判断是否抢到这个数据时,很有可能没有抢到,这样继续进行读取,可能几轮都抢不到,资源白白浪费了,所以可以通过lua脚本,来进行优化,
让zrangebyscore 和 zrem成为一个原子化操作,这就可以避免多线程争抢,抢不到的资源浪费了。
结语
一些专业的队列中间件,应用起来会比较复杂并且增加运维成本,例如RabbitMQ,发消息前需要创建Exchange交换机,再创建Queue,随后Exchange和Queue要进行绑定,发消息的时候还要指定routing-key 才能和Exchange匹配,最终到达Queue中,
如果场景简单的话,可以使用redis实现一个队列,但是需要注意,redis没有专业队列的特性,没有ack的保证,也就是说消息是不可靠的,消费失败后,就没有了,如果需要百分百的可靠性,还是需要采用专业的队列中间件的ack等机制作为保障。
更多编程相关知识,请访问:编程入门!!
以上是浅谈Redis中消息队列和延时消息队列的实现方法的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

热门话题

Redis集群模式通过分片将Redis实例部署到多个服务器,提高可扩展性和可用性。搭建步骤如下:创建奇数个Redis实例,端口不同;创建3个sentinel实例,监控Redis实例并进行故障转移;配置sentinel配置文件,添加监控Redis实例信息和故障转移设置;配置Redis实例配置文件,启用集群模式并指定集群信息文件路径;创建nodes.conf文件,包含各Redis实例的信息;启动集群,执行create命令创建集群并指定副本数量;登录集群执行CLUSTER INFO命令验证集群状态;使

Redis 使用哈希表存储数据,支持字符串、列表、哈希表、集合和有序集合等数据结构。Redis 通过快照 (RDB) 和追加只写 (AOF) 机制持久化数据。Redis 使用主从复制来提高数据可用性。Redis 使用单线程事件循环处理连接和命令,保证数据原子性和一致性。Redis 为键设置过期时间,并使用 lazy 删除机制删除过期键。

解决redis-server找不到问题的步骤:检查安装,确保已正确安装Redis;设置环境变量REDIS_HOST和REDIS_PORT;启动Redis服务器redis-server;检查服务器是否运行redis-cli ping。

要查看 Redis 中的所有键,共有三种方法:使用 KEYS 命令返回所有匹配指定模式的键;使用 SCAN 命令迭代键并返回一组键;使用 INFO 命令获取键的总数。

理解 Redis 源码的最佳方法是逐步进行:熟悉 Redis 基础知识。选择一个特定的模块或功能作为起点。从模块或功能的入口点开始,逐行查看代码。通过函数调用链查看代码。熟悉 Redis 使用的底层数据结构。识别 Redis 使用的算法。

使用 Redis 指令需要以下步骤:打开 Redis 客户端。输入指令(动词 键 值)。提供所需参数(因指令而异)。按 Enter 执行指令。Redis 返回响应,指示操作结果(通常为 OK 或 -ERR)。

要查看 Redis 版本号,可以使用以下三种方法:(1) 输入 INFO 命令,(2) 使用 --version 选项启动服务器,(3) 查看配置文件。

可以采用以下两种方法清除 Redis 中的数据:FLUSHALL 命令:删除数据库中所有键和值。CONFIG RESETSTAT 命令:重置数据库所有状态(包括键、值和其他统计信息)。
