Redis는 정렬된 집합(ZSet)을 통해 지연된 메시지 대기열을 구현합니다. ZSet에는 지연된 실행 시간을 저장하는 데 사용할 수 있는 Score 속성이 있습니다.
하지만 작업을 확인하려면 무한 루프가 필요하므로 시스템 리소스가 소모됩니다
class RedisDelayQueue(object): """Simple Queue with Redis Backend dq = RedisDelayQueue('delay:commtrans') dq.put( 5 ,{'info':'测试 5555','time': timestamp_to_datetime_str(t + 5)}) print(dq.get()) """ def __init__(self, name, namespace='queue'): """The default connection parameters are: host='localhost', port=6379, db=0""" self.__db = get_redis_engine(database_name='spdb') self.key = '%s:%s' % (namespace, name) def qsize(self): """Return the approximate size of the queue.""" return self.__db.zcard(self.key) def empty(self): """Return True if the queue is empty, False otherwise.""" return self.qsize() == 0 def rem(self, value): return self.__db.zrem(self.key, value) def get(self): # 获取任务,以0和当前时间为区间,返回一条在当前区间的记录 items = self.__db.zrangebyscore(self.key, 0, int(time.time()), 0, 1) if items: item = items[0] if self.rem(item): # 解决并发问题 如能删就让谁取走 return json.loads(item) return None def put(self, interval, item): """:param interval 延时秒数""" # 以时间作为score,对任务队列按时间戳从小到大排序 """Put item into the queue.""" d = json.dumps(item) return self.__db.zadd(self.key, {d: int(time.time()) + int(interval)})
핵심 기능 중 하나인 최근 개발 부서의 새로운 프로젝트 즉, 다음과 같은 비즈니스 시나리오와 같이 사용자 행동을 기반으로 특정 시점에 해당 알림 메시지가 사용자에게 푸시됩니다.
사용자가 충전 항목을 클릭하고 반 동안 충전하지 않은 후 시간이 지나면 사용자에게 완료되지 않은 충전 알림을 푸시합니다.
사용자가 마지막으로 읽기 행동을 한 후 2시간이 지나면 계속 읽기 알림을 사용자에게 푸시합니다.
사용자가 앱을 새로 등록하거나 앱을 종료한 후 N분 후에 적절한 추천 메시지를 사용자에게 푸시합니다.
…
위 시나리오의 일반적인 특징은 이벤트가 트리거된 후 특정 작업을 실행하기 전에 일정 시간을 지연하는 것입니다. 이벤트 트리거 시간을 알면 위 논리도 가능합니다. 특정 작업을 수행하기 위해 지정된 시점(이벤트 트리거 시점 + 지연 시간 길이)에 해당합니다.
지연 대기열은 일반적으로 이러한 종류의 요구 사항을 달성하는 데 사용됩니다. 생성된 지연 메시지에는 작업 지연 시간 또는 작업 실행 시점과 같은 정보가 포함되어야 하며 작업이 시간 조건을 충족하고 실행되어야 합니다. 즉, 대기열에 있는 메시지가 소비되는 시점을 지정할 수 있다고 합니다.
독립형 환경에서 JDK
에는 이미 DelayQueue
, Timer 등 지연 대기열 기능을 구현할 수 있는 많은 구성 요소가 포함되어 있습니다.
, ScheduledExecutorService
및 기타 구성 요소는 지연된 작업을 쉽게 생성할 수 있습니다. 그러나 위 구성 요소를 사용하려면 일반적으로 서비스를 수행할 때 작업이 메모리에 저장되어야 합니다. 다시 시작되고 작업 규모가 제한됩니다. 메모리에 따라 양이 제한되고 장기간 메모리 사용량이 발생하며 일반적으로 작업 요구 사항이 낮은 단일 프로세스 클라이언트 프로그램에 적합합니다. JDK
已经自带了很多能够实现延时队列功能的组件,比如DelayQueue
, Timer
, ScheduledExecutorService
等组件,都可以较为简便地创建延时任务,但上述组件使用一般需要把任务存储在内存中,服务重启存在任务丢失风险,且任务规模体量受内存限制,同时也造成长时间内存占用,并不灵活,通常适用于单进程客服端程序中或对任务要求不高的项目中。
在分布式环境下,仅使用JDK
自带组件并不能可靠高效地实现延时队列,通常需要引入第三方中间件或框架。
比如常见的经典任务调度框架Quartz
或基于此框架的xxl-job
等其它框架,这些框架的主要功能是实现定时任务或周期性任务,在Redis
、RabbitMQ
还未广泛应用时,譬如常见的超时未支付取消订单等功能都是由定时任务实现的,通过定时轮询来判断是否已到达触发执行的时间点。
但由于定时任务需要一定的周期性,周期扫描的间隔时间不好控制,太短会造成很多无意义的扫描,且增大系统压力,太长又会造成执行时间误差太大,且可能造成单次扫描所处理的堆积记录数量过大。
此外,利用MQ
做延时队列也是一种常见的方式,比如通过RabbitMQ
的TTL
和死信队列实现消息的延迟投递,考虑到投递出去的MQ
消息无法方便地实现删除或修改,即无法实现任务的取消或任务执行时间点的更改,同时也不能方便地对消息进行去重,因此在项目中并未选择使用MQ
实现延时队列。
Redis
的数据结构zset
,同样可以实现延迟队列的效果,且更加灵活,可以实现MQ
无法做到的一些特性,因此项目最终采用Redis
实现延时队列,并对其进行优化与封装。
实现原理是利用zset
的score
属性,redis
会将zset
集合中的元素按照score
进行从小到大排序,通过zadd
命令向zset
中添加元素,如下述命令所示,其中value
值为延时任务消息,可根据业务定义消息格式,score
值为任务执行的时间点,比如13位毫秒时间戳。
zadd delayqueue 1614608094000 taskinfo
任务添加后,获取任务的逻辑只需从zset
中筛选score
值小于当前时间戳的元素,所得结果便是当前时间节点下需要执行的任务,通过zrangebyscore
命令来获取,如下述命令所示,其中timestamp
为当前时间戳,可用limit
JDK
에 내장된 구성 요소만으로는 지연 대기열을 안정적이고 효율적으로 구현할 수 없습니다. 일반적으로 타사 미들웨어나 프레임워크를 도입해야 합니다. 🎜🎜예를 들어, 일반적인 클래식 작업 스케줄링 프레임워크인 Quartz
또는 이 프레임워크 xxl-job
을 기반으로 하는 다른 프레임워크는 이러한 프레임워크의 주요 기능은 예약된 작업 또는 주기적 작업을 구현하는 것입니다. Redis
및 RabbitMQ
가 널리 사용되지 않는 경우에는 예약된 작업에 의해 시간 초과 및 미결제 주문 취소와 같은 일반적인 기능이 구현되어 주문이 완료되었는지 확인했습니다. 실행을 트리거하는 시점에 도달했습니다. 🎜🎜그러나 예약된 작업에는 일정한 주기가 필요하기 때문에 정기 검사 간격을 제어하기가 어렵습니다. 너무 짧으면 의미 없는 검사가 많이 발생하고 너무 길면 시스템 부담이 증가합니다. 너무 많은 실행 시간 오류 및 결과적으로 단일 스캔에서 처리되는 누적 레코드 수가 너무 많습니다. 🎜🎜또한 RabbitMQ
의 TTL
을 통해 메시지 처리를 구현하는 등 MQ
를 지연 대기열로 사용하는 일반적인 방법이기도 합니다. 배달된 MQ
메시지는 쉽게 삭제되거나 수정될 수 없다는 점, 즉 작업을 취소하거나 작업 실행 시점을 변경할 수 없으며 메시지를 변경할 수 없다는 점을 고려하여 지연된 배달 따라서 우리는 프로젝트에서 지연 대기열을 구현하기 위해 MQ
를 사용하지 않았습니다. 🎜🎜Redis
의 데이터 구조 zset
는 지연된 대기열의 효과도 얻을 수 있으며 MQ
의 일부 기능을 더 유연하게 실현할 수 있습니다. 달성할 수 없으므로 프로젝트는 마침내 Redis
를 사용하여 지연 대기열을 구현하고 이를 최적화하고 캡슐화합니다. 🎜🎜구현 원칙은 zset
의 score
속성을 사용하는 것입니다. redis
는 zset
의 요소를 설정합니다. >score
에 따른 컬렉션은 작은 것부터 큰 것 순으로 정렬되며, 다음과 같이 zadd
명령을 통해 zset
에 요소가 추가됩니다. 명령, 여기서 value
값은 지연된 작업 메시지이며 메시지 형식은 비즈니스에 따라 정의될 수 있습니다. score
값은 작업 실행 시점입니다. 13자리 밀리초 타임스탬프로 표시됩니다. 🎜zrangebyscore delayqueue 0 timestamp limit 0 1000
score
값이 zset
의 현재 타임스탬프보다 작은 요소만 필터링하면 되며 그 결과는 현재 시간 노드에서 실행되어야 하는 것입니다. 작업은 다음 명령에 표시된 대로 zrangebyscore
명령을 통해 가져옵니다. 여기서 timestamp
는 현재 타임스탬프이고 < code>limit는 한 번에 얻는 레코드 수가 너무 커지는 것을 방지하기 위해 각 가져오기 레코드 수를 제한하는 데 사용할 수 있습니다. 🎜zrangebyscore delayqueue 0 timestamp limit 0 1000
在实际实现过程中,从zset
中获取到当前需要执行的任务后,需要先确保将任务对应的元素从zset
中删除,删除成功后才允许执行任务逻辑,这样是为了在分布式环境下,当存在多个线程获取到同一任务后,利用redis
删除操作的原子性,确保只有一个线程能够删除成功并执行任务,防止重复执行。
实际任务的执行通常会再将其发送至MQ
异步处理,将“获取任务”与“执行任务”两者分离解耦,更加灵活,“获取任务”只负责拿到当前时间需要执行的任务,并不真正运行任务业务逻辑,因此只需相对少量的执行线程即可,而实际的任务执行逻辑则由MQ
消费者承担,方便调控负载能力。
整体过程如下图所示。
采用zset
做延时队列的另一个好处是可以实现任务的取消和任务执行时间点的更改,只需要将任务信息从zset
中删除,便可取消任务,同时由于zset
拥有集合去重的特性,只需再次写入同一个任务信息,但是value
值设置为不同的执行时间点,便可更改任务执行时间,实现单个任务执行时间的动态调整。
了解实现原理后,再进行具体编程实现。创建延时任务较为简便,准备好任务消息和执行时间点,写入zset
即可。获取延时任务最简单的方案是通过定时任务,周期性地执行上述逻辑,如下代码所示。
@XxlScheduled(cron = "0/5 * * * * ?", name = "scan business1 delayqueue") public void scanBusiness1() { // 某业务逻辑的zset延迟队列对应的key String zsetKey = "delayqueue:business1"; while (true) { // 筛选score值小于当前时间戳的元素,一次最多拉取1000条 Set<String> tasks = stringRedisTemplate.opsForZSet().rangeByScore(zsetKey, 0, System.currentTimeMillis(), 0, 1000); if (CollectionUtils.isEmpty(tasks)) { // 当前时间下已没有需要执行的任务,结束本次扫描 return; } for (String task : tasks) { // 先删除,再执行,确保多线程环境下执行的唯一性 Boolean delete = stringRedisTemplate.delete(task); if (delete) { // 删除成功后,将其再发送到指定MQ异步处理,将“获取任务”与“执行任务”分离解耦 rabbitTemplate.convertAndSend("exchange_business1", "routekey_business1", task); } } } }
上述方案使用xxl-job
做分布式定时任务,间隔5秒执行一次,代码借助spring
提供的api
来完成redis
和MQ
的操作。
由于是分布式定时任务,每次执行只有一个线程在获取任务,机器利用率低,当数据规模较大时,单靠一个线程无法满足吞吐量要求,因此这种方案只适用于小规模数据量级别。
此处间隔时间也可适当调整,例如缩短为1秒,调整所需考虑原则在上文已提到:间隔太短会造成很多无意义的扫描,且增大系统压力,太长又会造成执行时间误差太大。
为了提升整体吞吐量,考虑不使用分布式定时任务,对集群内每台机器(或实例)均设置独立的定时任务,同时采用多个zset
队列,以数字后缀区分。
假设有M个zset
队列,创建延时消息时选取消息的某个ID
字段,计算hash
值再对M取余,根据余数决定发送到对应数字后缀的zset
队列中(分散消息,此处ID
字段选取需要考虑做到均匀分布,不要造成数据倾斜)。
队列数量M的选取需要考虑机器数量N,理想情况下有多少台机器就定义多少个队列,保持M与N基本相等即可。
因为队列太少,会造成机器对队列的竞争访问处理,队列太多又会导致任务得不到及时的处理。
为了灵活应对集群机器数量的变化,建议将队列数量配置为动态可调的,例如采用分布式配置中心的方案。
每台机器在触发定时任务时,需要通过适当的负载均衡来决定从哪个队列拉取消息,负载均衡的好坏也会影响整个集群的效率,如果负载分布不均可能会导致多台机器竞争处理同一队列,降低效率。
一个简单实用的做法是利用redis
的自增操作再对队列数量取余即可,只要保持队列数量和机器数量基本相等,这种做法在很大程度上就可以保证不会有多台机器竞争同一队列。
至于每台机器从对应zset
中的任务获取逻辑,仍然和前面代码一致。以上方式简化实现代码如下所示。
@Scheduled(cron = "0/5 * * * * ?") public void scanBusiness1() { // 队列数量M,考虑动态配置,保持和机器数量基本一致 int M = 10; // redis自增key,用于负载均衡 String incrKey = "incrkey:delayqueue:business1"; // 每台机器执行时,从不同的zset中拉取消息,尽量确保不同机器访问不同zset String zsetKey = "delayqueue:business1:" + (stringRedisTemplate.opsForValue().increment(incrKey) % M); while (true) { // 此处逻辑和前面代码一致,省略。。。 } }
上述方案和第一种方案的主要的不同点在于zsetKey
的获取上,这里是根据负载均衡算法算出来的,确保每台机器访问不同zset
并拉取消息,同时定时任务采用spring
提供的进程内注解@Scheduled
,集群内每台机器都会间隔5秒执行,因此相比之前的方案,能够较为明显地提升整个集群的吞吐量。
但是这种方案的步骤相对更为复杂,需要动态配置队列数量,同时在创建延时任务时需要选择合适的消息ID
字段来决定发送的目标zset
队列,此处还要考虑均匀分布,整体实现要考虑的因素较多。
上面一种方案已经能够较好地满足整体吞吐量要求,但其缺点是步骤相对复杂,因此项目中没有采用这种方案,而是采用下面一种也能满足吞吐量要求,步骤相对简单,又方便通用化的方案。
该方案不使用定时任务,而是单独启动后台线程,在线程中执行永久循环,每次循环逻辑为:从目标zset
中获取score
值小于当前时间戳的元素集合中的score
最小的那个元素,相当于获取当前时间点需要执行且执行时间点最早的那个任务,如果获取不到,表示当前时间点下暂无需要执行的任务,则线程休眠100ms
(可视情况调整),否则,对获取到的元素进行处理,在分布式多线程环境下,仍然需要先删除成功才能进行处理。
此外,考虑到每个线程获取元素后都需要再次访问redis
尝试删除操作,为了避免多线程争抢浪费资源,降低效率,这里采用lua
脚本将获取和删除操作原子化。lua
脚本逻辑代码如下所示。
local zsetKey = 'delayqueue' local timestamp = 1614608094000 local items = redis.call('zrangebyscore',zsetKey,0,timestamp,'limit',0,1) if #items == 0 then return '' else redis.call('zremrangebyrank',zsetKey,0,0) return items[1] end
其中timestamp
为当前时间戳,通过在zrangebyscore
命令中指定limit
为1来获取score
最小的元素,若获取不到,即结果集长度为0,则返回空字符串,否则,通过zremrangebyrank
命令删除头部元素,即score
最小的元素,也就是之前获取到的那个元素,由于redis
内部保证lua
脚本的原子性,上述获取并删除的操作能够运行无误。具体JAVA
实现中还对其进行了多线程操作的封装和通用化的抽象,使不同业务都能够使用该组件实现延时队列。具体实现代码如下所示。
/** * 基于ZSET实现消息延迟处理,score存储执行时间点,到达时间点即会向指定队列发送该消息; * 定义一个继承本类的bean即可; */ public abstract class AbstractDelayedMsgScanTrigger implements Runnable, DisposableBean { private static final RedisScript<String> TRY_GET_AND_DEL_SCRIPT; static { // 获取并删除的lua脚本,使用spring提供的api String sb = "local items = redis.call('zrangebyscore',KEYS[1],0,ARGV[1],'limit',0,1)\n" + "if #items == 0 then\n" + "\treturn ''\n" + "else\n" + "\tredis.call('zremrangebyrank',KEYS[1],0,0)\n" + "\treturn items[1]\n" + "end"; // 自有工具类,只要能创建出spring包下的 RedisScript 的实现类对象均可 TRY_GET_AND_DEL_SCRIPT = RedisScriptHelper.createScript(sb, String.class); } private final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(getThreadNum(), getThreadNum(), 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getThreadNamePrefix())); private volatile boolean quit = false; @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void startScan() { // bean构建完成后,启动若干执行线程 int threadNum = getThreadNum(); for (int i = 0; i < threadNum; i++) { EXECUTOR.execute(this); } } @Override public void run() { while (!quit) { try { // 循环,采用lua获取当前需要执行的任务并将其从redis中删除 String msg = stringRedisTemplate.execute(TRY_GET_AND_DEL_SCRIPT, Lists.newArrayList(getDelayedMsgSourceKey()), String.valueOf(System.currentTimeMillis())); if (StringUtils.isNotBlank(msg)) { // 消息不为空,表示获取任务成功,将其再发送到指定MQ异步处理,将“获取任务”与“执行任务”分离解耦 rabbitTemplate.convertAndSend(getSendExchange(), getSendRoutingKey(), msg); } else { // 获取不到任务,表示当前时间点下暂无需要执行的任务,则线程休眠1S(可视情况调整) SleepUtils.sleepSeconds(1); } } catch (Exception e) { Logs.MSG.error("delayed msg scan error, sourceKey:{}", getDelayedMsgSourceKey(), e); } } } @Override public void destroy() throws Exception { quit = true; } public void setQuit(boolean quit) { this.quit = quit; } /** * 获取消息的工作线程数量 */ protected abstract int getThreadNum(); /** * 线程名称前缀,方便问题定位 */ protected abstract String getThreadNamePrefix(); /** * 存放延迟消息的ZSET队列名 */ protected abstract String getDelayedMsgSourceKey(); /** * 消息到达执行时间点时将其通过指定 exchange 发送到实时消费队列中 */ protected abstract String getSendExchange(); /** * 消息到达执行时间点时将其通过指定 routingKey 发送到实时消费队列中 */ protected abstract String getSendRoutingKey(); }
在具体业务应用中,只需定义一个继承上述类的bean
即可,需要实现的方法主要是提供一些配置,比如该业务对应的zset
延时队列名称,同时工作拉取消息的线程数量,由于采用rabbitMq
,因此这里需要提供exchange
和routingKey
。
实际使用中只需向该zset
队列中添加消息,并将score
设为该任务需要执行的时间点(此处为13位毫秒时间戳),则到该时间点后,上述组件便会将该消息从zset
中取出并删除,再将其通过指定的路由发送到实时MQ
消费队列中,由消费者负责执行任务业务逻辑。目前该组件在项目中正常平稳运行。
注意:
本文结合项目中的实际需求介绍了延时队列的应用场景,分析了延时队列的多种实现,重点讲述了利用redis
实现延时队列的原理,对其实现方案进行比较与优化,并将最终方案实际运用于项目需求中。
위 내용은 Redis에서 지연 대기열을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!