Maison > base de données > Redis > Comment implémenter une file d'attente différée dans Redis

Comment implémenter une file d'attente différée dans Redis

WBOY
Libérer: 2023-05-26 16:44:23
avant
2399 Les gens l'ont consulté

    Redis implémente la file d'attente différée

    Redis delay queue

    Redis est implémenté via des collections ordonnées ( ZSet) est utilisé pour implémenter des files d'attente de messages retardés. ZSet possède un attribut Score qui peut être utilisé pour stocker le temps d'exécution retardé.

    Mais cela nécessite une boucle infinie pour vérifier la tâche, ce qui consommera des ressources système

    class RedisDelayQueue(object):
        """Simple Queue with Redis Backend
        dq = RedisDelayQueue('delay:commtrans')
        dq.put( 5 ,{'info':'测试 5555','time': timestamp_to_datetime_str(t + 5)})
        print(dq.get())
        """
        def __init__(self, name, namespace='queue'):
            """The default connection parameters are: host='localhost', port=6379, db=0"""
            self.__db = get_redis_engine(database_name='spdb')
            self.key = '%s:%s' % (namespace, name)
        def qsize(self):
            """Return the approximate size of the queue."""
            return self.__db.zcard(self.key)
        def empty(self):
            """Return True if the queue is empty, False otherwise."""
            return self.qsize() == 0
        def rem(self, value):
            return self.__db.zrem(self.key, value)
        def get(self):
            # 获取任务,以0和当前时间为区间,返回一条在当前区间的记录
            items = self.__db.zrangebyscore(self.key, 0, int(time.time()), 0, 1)
            if items:
                item = items[0]
                if self.rem(item):  # 解决并发问题  如能删就让谁取走
                    return json.loads(item)
            return None
        def put(self, interval, item):
            """:param interval 延时秒数"""
            # 以时间作为score,对任务队列按时间戳从小到大排序
            """Put item into the queue."""
            d = json.dumps(item)
            return self.__db.zadd(self.key, {d: int(time.time()) + int(interval)})
    Copier après la connexion

    Solution d'optimisation Redis pour implémenter la file d'attente de retard

    Application de delay queue# 🎜🎜#

    L'une des fonctions clés des nouveaux projets récents du département de développement est le push intelligent, qui consiste à envoyer des messages de rappel correspondants aux utilisateurs à des moments précis en fonction du comportement de l'utilisateur, comme le scénarios commerciaux suivants :

    • Après que l'utilisateur clique sur l'élément de recharge et ne recharge pas dans la demi-heure, un rappel de recharge incomplète sera envoyé à l'utilisateur.

    • Envoyez un rappel de poursuite de la lecture à l'utilisateur 2 heures après le dernier comportement de lecture de l'utilisateur.

    • N minutes après que l'utilisateur s'est nouvellement inscrit ou a quitté l'application, envoyez des messages de recommandation appropriés à l'utilisateur.

    La caractéristique commune des scénarios ci-dessus est de retarder pendant un certain temps après le déclenchement d'un événement. Pour exécuter une tâche spécifique, si le moment de déclenchement de l'événement est connu, la logique ci-dessus peut également être équivalente à l'exécution d'une tâche spécifique à un moment donné (point temporel de déclenchement de l'événement + durée du délai).

    Les files d'attente de retard sont généralement utilisées pour répondre à ce type d'exigences. Les messages de retard créés doivent contenir des informations telles que le délai d'exécution de la tâche ou le moment où la tâche répond aux conditions temporelles et doit l'être. exécuté, le message sera consommé, ce qui signifie que vous pouvez spécifier le moment auquel les messages de la file d'attente sont consommés.

    Implémentation de la file d'attente différée

    Dans un environnement autonome, JDK est déjà livré avec de nombreux composants pouvant implémenter la fonction de file d'attente différée, tels que < Code>DelayQueue, Timer, ScheduledExecutorService et d'autres composants peuvent facilement créer des tâches retardées, mais l'utilisation des composants ci-dessus nécessite généralement que la tâche soit stockée dans mémoire. Il existe un risque de perte de tâche lors du redémarrage du service, et la taille de la tâche est limitée par la mémoire. Cela entraîne également une utilisation de la mémoire à long terme et n'est généralement pas adapté aux programmes clients à processus unique. des projets qui n'ont pas d'exigences de tâches élevées.

    JDK已经自带了很多能够实现延时队列功能的组件,比如DelayQueue, Timer, ScheduledExecutorService等组件,都可以较为简便地创建延时任务,但上述组件使用一般需要把任务存储在内存中,服务重启存在任务丢失风险,且任务规模体量受内存限制,同时也造成长时间内存占用,并不灵活,通常适用于单进程客服端程序中或对任务要求不高的项目中。

    在分布式环境下,仅使用JDK自带组件并不能可靠高效地实现延时队列,通常需要引入第三方中间件或框架。

    比如常见的经典任务调度框架Quartz或基于此框架的xxl-job等其它框架,这些框架的主要功能是实现定时任务或周期性任务,在RedisRabbitMQ还未广泛应用时,譬如常见的超时未支付取消订单等功能都是由定时任务实现的,通过定时轮询来判断是否已到达触发执行的时间点。

    但由于定时任务需要一定的周期性,周期扫描的间隔时间不好控制,太短会造成很多无意义的扫描,且增大系统压力,太长又会造成执行时间误差太大,且可能造成单次扫描所处理的堆积记录数量过大。

    此外,利用MQ做延时队列也是一种常见的方式,比如通过RabbitMQTTL和死信队列实现消息的延迟投递,考虑到投递出去的MQ消息无法方便地实现删除或修改,即无法实现任务的取消或任务执行时间点的更改,同时也不能方便地对消息进行去重,因此在项目中并未选择使用MQ实现延时队列。

    Redis的数据结构zset,同样可以实现延迟队列的效果,且更加灵活,可以实现MQ无法做到的一些特性,因此项目最终采用Redis实现延时队列,并对其进行优化与封装。

    实现原理是利用zsetscore属性,redis会将zset集合中的元素按照score进行从小到大排序,通过zadd命令向zset中添加元素,如下述命令所示,其中value值为延时任务消息,可根据业务定义消息格式,score值为任务执行的时间点,比如13位毫秒时间戳。

    zadd delayqueue 1614608094000 taskinfo
    Copier après la connexion

    任务添加后,获取任务的逻辑只需从zset中筛选score值小于当前时间戳的元素,所得结果便是当前时间节点下需要执行的任务,通过zrangebyscore命令来获取,如下述命令所示,其中timestamp为当前时间戳,可用limitDans un environnement distribué, l'utilisation seule des composants intégrés du JDK ne peut pas implémenter une file d'attente de retard de manière fiable et efficace. Il est généralement nécessaire d'introduire un middleware ou un framework tiers.

    #🎜🎜#Par exemple, le framework de planification de tâches classique commun Quartz ou d'autres frameworks basés sur ce framework xxl-job La fonction principale de ces frameworks est. pour implémenter des tâches ou des tâches périodiques, lorsque Redis et RabbitMQ n'étaient pas largement utilisés, des fonctions courantes telles que le délai d'attente et l'annulation de commandes non payées étaient toutes implémentées par des tâches planifiées. déterminer si le moment opportun pour déclencher l’exécution a été atteint. #🎜🎜##🎜🎜#Cependant, étant donné que les tâches planifiées nécessitent une certaine périodicité, l'intervalle entre les analyses périodiques est difficile à contrôler s'il est trop court, cela entraînera de nombreuses analyses inutiles et augmentera la pression du système. trop long, cela entraînera un temps d'exécution trop important et peut entraîner un nombre trop important d'enregistrements empilés traités en une seule analyse. #🎜🎜##🎜🎜# De plus, il s'agit également d'un moyen courant d'utiliser MQ comme file d'attente retardée, par exemple via le TTL< de <code>RabbitMQ. /code> et La file d'attente des lettres mortes implémente la livraison retardée des messages étant donné que les messages MQ livrés ne peuvent pas être facilement supprimés ou modifiés, c'est-à-dire l'annulation de la tâche ou la modification du temps d'exécution de la tâche. Le point ne peut pas être réalisé. En même temps, il ne peut pas être facilement supprimé ou modifié. Les messages sont dédupliqués, donc MQ n'a pas été choisi pour implémenter la file d'attente différée dans le projet. #🎜🎜##🎜🎜#La structure de données Redis zset peut également obtenir l'effet d'une file d'attente retardée, est plus flexible et peut implémenter MQ< /code> Certaines fonctionnalités ne peuvent pas être réalisées, c'est pourquoi le projet utilise finalement <code>Redis pour implémenter la file d'attente différée, l'optimise et l'encapsule. #🎜🎜##🎜🎜#Le principe d'implémentation est d'utiliser l'attribut score de zset, et redis collectera zset< /code> Les éléments sont triés de petit à grand selon <code>score, et les éléments sont ajoutés à zset via la commande zadd, comme indiqué dans la commande suivante, où value est un message de tâche retardé et le format du message peut être défini en fonction de l'entreprise. La valeur score est le moment de la tâche. exécution, comme un horodatage à 13 chiffres en millisecondes. #🎜🎜#
    zrangebyscore delayqueue 0 timestamp limit 0 1000
    Copier après la connexion
    Copier après la connexion
    #🎜🎜#Une fois la tâche ajoutée, la logique pour obtenir la tâche n'a besoin que de filtrer les éléments dont la valeur score est inférieure à l'horodatage actuel de zset</ code>, et le résultat est Les tâches qui doivent être exécutées au nœud temporel actuel peuvent être obtenues via la commande <code>zrangebyscore, comme indiqué dans la commande suivante, où timestamp est l'horodatage actuel, et limit</code peut être utilisé >Limiter le nombre d'enregistrements extraits à chaque fois pour éviter que le nombre d'enregistrements obtenus à la fois ne soit trop grand. #🎜🎜#<div class="code" style="position:relative; padding:0px; margin:0px;"><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">zrangebyscore delayqueue 0 timestamp limit 0 1000</pre><div class="contentsignin">Copier après la connexion</div></div><div class="contentsignin">Copier après la connexion</div></div><p>在实际实现过程中,从<code>zset中获取到当前需要执行的任务后,需要先确保将任务对应的元素从zset中删除,删除成功后才允许执行任务逻辑,这样是为了在分布式环境下,当存在多个线程获取到同一任务后,利用redis删除操作的原子性,确保只有一个线程能够删除成功并执行任务,防止重复执行。

    实际任务的执行通常会再将其发送至MQ异步处理,将“获取任务”与“执行任务”两者分离解耦,更加灵活,“获取任务”只负责拿到当前时间需要执行的任务,并不真正运行任务业务逻辑,因此只需相对少量的执行线程即可,而实际的任务执行逻辑则由MQ消费者承担,方便调控负载能力。

    整体过程如下图所示。

    Comment implémenter une file dattente différée dans Redis

    采用zset做延时队列的另一个好处是可以实现任务的取消和任务执行时间点的更改,只需要将任务信息从zset中删除,便可取消任务,同时由于zset拥有集合去重的特性,只需再次写入同一个任务信息,但是value值设置为不同的执行时间点,便可更改任务执行时间,实现单个任务执行时间的动态调整。

    了解实现原理后,再进行具体编程实现。创建延时任务较为简便,准备好任务消息和执行时间点,写入zset即可。获取延时任务最简单的方案是通过定时任务,周期性地执行上述逻辑,如下代码所示。

    @XxlScheduled(cron = "0/5 * * * * ?", name = "scan business1 delayqueue")
    public void scanBusiness1() {
    	// 某业务逻辑的zset延迟队列对应的key
    	String zsetKey = "delayqueue:business1";
    	while (true) {
    		// 筛选score值小于当前时间戳的元素,一次最多拉取1000条
    		Set<String> tasks = stringRedisTemplate.opsForZSet().rangeByScore(zsetKey, 0, System.currentTimeMillis(), 0, 1000);
    		if (CollectionUtils.isEmpty(tasks)) {
    			// 当前时间下已没有需要执行的任务,结束本次扫描
    			return;
    		}
    		for (String task : tasks) {
    			// 先删除,再执行,确保多线程环境下执行的唯一性
    			Boolean delete = stringRedisTemplate.delete(task);
    			if (delete) {
    				// 删除成功后,将其再发送到指定MQ异步处理,将“获取任务”与“执行任务”分离解耦
    				rabbitTemplate.convertAndSend("exchange_business1", "routekey_business1", task);
    			}
    		}
    	}
    }
    Copier après la connexion

    上述方案使用xxl-job做分布式定时任务,间隔5秒执行一次,代码借助spring提供的api来完成redisMQ的操作。

    由于是分布式定时任务,每次执行只有一个线程在获取任务,机器利用率低,当数据规模较大时,单靠一个线程无法满足吞吐量要求,因此这种方案只适用于小规模数据量级别。

    此处间隔时间也可适当调整,例如缩短为1秒,调整所需考虑原则在上文已提到:间隔太短会造成很多无意义的扫描,且增大系统压力,太长又会造成执行时间误差太大。

    为了提升整体吞吐量,考虑不使用分布式定时任务,对集群内每台机器(或实例)均设置独立的定时任务,同时采用多个zset队列,以数字后缀区分。

    假设有Mzset队列,创建延时消息时选取消息的某个ID字段,计算hash值再对M取余,根据余数决定发送到对应数字后缀的zset队列中(分散消息,此处ID字段选取需要考虑做到均匀分布,不要造成数据倾斜)。

    队列数量M的选取需要考虑机器数量N,理想情况下有多少台机器就定义多少个队列,保持MN基本相等即可。

    因为队列太少,会造成机器对队列的竞争访问处理,队列太多又会导致任务得不到及时的处理。

    为了灵活应对集群机器数量的变化,建议将队列数量配置为动态可调的,例如采用分布式配置中心的方案。

    每台机器在触发定时任务时,需要通过适当的负载均衡来决定从哪个队列拉取消息,负载均衡的好坏也会影响整个集群的效率,如果负载分布不均可能会导致多台机器竞争处理同一队列,降低效率。

    一个简单实用的做法是利用redis的自增操作再对队列数量取余即可,只要保持队列数量和机器数量基本相等,这种做法在很大程度上就可以保证不会有多台机器竞争同一队列。

    至于每台机器从对应zset中的任务获取逻辑,仍然和前面代码一致。以上方式简化实现代码如下所示。

    @Scheduled(cron = "0/5 * * * * ?")
    public void scanBusiness1() {
    	// 队列数量M,考虑动态配置,保持和机器数量基本一致
    	int M = 10;
    	// redis自增key,用于负载均衡
    	String incrKey = "incrkey:delayqueue:business1";
    	// 每台机器执行时,从不同的zset中拉取消息,尽量确保不同机器访问不同zset
    	String zsetKey = "delayqueue:business1:" + (stringRedisTemplate.opsForValue().increment(incrKey) % M);
    	while (true) {
    		// 此处逻辑和前面代码一致,省略。。。
    	}
    }
    Copier après la connexion

    上述方案和第一种方案的主要的不同点在于zsetKey的获取上,这里是根据负载均衡算法算出来的,确保每台机器访问不同zset并拉取消息,同时定时任务采用spring提供的进程内注解@Scheduled,集群内每台机器都会间隔5秒执行,因此相比之前的方案,能够较为明显地提升整个集群的吞吐量。

    但是这种方案的步骤相对更为复杂,需要动态配置队列数量,同时在创建延时任务时需要选择合适的消息ID字段来决定发送的目标zset队列,此处还要考虑均匀分布,整体实现要考虑的因素较多。

    上面一种方案已经能够较好地满足整体吞吐量要求,但其缺点是步骤相对复杂,因此项目中没有采用这种方案,而是采用下面一种也能满足吞吐量要求,步骤相对简单,又方便通用化的方案。

    该方案不使用定时任务,而是单独启动后台线程,在线程中执行永久循环,每次循环逻辑为:从目标zset中获取score值小于当前时间戳的元素集合中的score最小的那个元素,相当于获取当前时间点需要执行且执行时间点最早的那个任务,如果获取不到,表示当前时间点下暂无需要执行的任务,则线程休眠100ms(可视情况调整),否则,对获取到的元素进行处理,在分布式多线程环境下,仍然需要先删除成功才能进行处理。

    此外,考虑到每个线程获取元素后都需要再次访问redis尝试删除操作,为了避免多线程争抢浪费资源,降低效率,这里采用lua脚本将获取和删除操作原子化。lua脚本逻辑代码如下所示。

    local zsetKey = &#39;delayqueue&#39;
    local timestamp = 1614608094000
    local items = redis.call(&#39;zrangebyscore&#39;,zsetKey,0,timestamp,&#39;limit&#39;,0,1)
    if #items == 0 then
        return &#39;&#39;
    else
        redis.call(&#39;zremrangebyrank&#39;,zsetKey,0,0)
        return items[1]
    end
    Copier après la connexion

    其中timestamp为当前时间戳,通过在zrangebyscore命令中指定limit为1来获取score最小的元素,若获取不到,即结果集长度为0,则返回空字符串,否则,通过zremrangebyrank命令删除头部元素,即score最小的元素,也就是之前获取到的那个元素,由于redis内部保证lua脚本的原子性,上述获取并删除的操作能够运行无误。具体JAVA实现中还对其进行了多线程操作的封装和通用化的抽象,使不同业务都能够使用该组件实现延时队列。具体实现代码如下所示。

    /**
     * 基于ZSET实现消息延迟处理,score存储执行时间点,到达时间点即会向指定队列发送该消息;
     * 定义一个继承本类的bean即可;
     */
    public abstract class AbstractDelayedMsgScanTrigger implements Runnable, DisposableBean {
    	private static final RedisScript<String> TRY_GET_AND_DEL_SCRIPT;
    	static {
    		// 获取并删除的lua脚本,使用spring提供的api
    		String sb = "local items = redis.call(&#39;zrangebyscore&#39;,KEYS[1],0,ARGV[1],&#39;limit&#39;,0,1)\n" +
    				"if #items == 0 then\n" +
    				"\treturn &#39;&#39;\n" +
    				"else\n" +
    				"\tredis.call(&#39;zremrangebyrank&#39;,KEYS[1],0,0)\n" +
    				"\treturn items[1]\n" +
    				"end";
    		// 自有工具类,只要能创建出spring包下的 RedisScript 的实现类对象均可
    		TRY_GET_AND_DEL_SCRIPT = RedisScriptHelper.createScript(sb, String.class);
    	}
    	private final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(getThreadNum(), getThreadNum(),
    			0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getThreadNamePrefix()));
    	private volatile boolean quit = false;
    	@Autowired
    	private StringRedisTemplate stringRedisTemplate;
    	@Autowired
    	private RabbitTemplate rabbitTemplate;
    	@PostConstruct
    	public void startScan() {
    		// bean构建完成后,启动若干执行线程
    		int threadNum = getThreadNum();
    		for (int i = 0; i < threadNum; i++) {
    			EXECUTOR.execute(this);
    		}
    	}
    	@Override
    	public void run() {
    		while (!quit) {
    			try {
    				// 循环,采用lua获取当前需要执行的任务并将其从redis中删除
    				String msg = stringRedisTemplate.execute(TRY_GET_AND_DEL_SCRIPT,
    						Lists.newArrayList(getDelayedMsgSourceKey()), String.valueOf(System.currentTimeMillis()));
    				if (StringUtils.isNotBlank(msg)) {
    					// 消息不为空,表示获取任务成功,将其再发送到指定MQ异步处理,将“获取任务”与“执行任务”分离解耦
    					rabbitTemplate.convertAndSend(getSendExchange(), getSendRoutingKey(), msg);
    				} else {
    					// 获取不到任务,表示当前时间点下暂无需要执行的任务,则线程休眠1S(可视情况调整)
    					SleepUtils.sleepSeconds(1);
    				}
    			} catch (Exception e) {
    				Logs.MSG.error("delayed msg scan error, sourceKey:{}", getDelayedMsgSourceKey(), e);
    			}
    		}
    	}
    	@Override
    	public void destroy() throws Exception {
    		quit = true;
    	}
    	public void setQuit(boolean quit) {
    		this.quit = quit;
    	}
    	/**
    	 * 获取消息的工作线程数量
    	 */
    	protected abstract int getThreadNum();
    	/**
    	 * 线程名称前缀,方便问题定位
    	 */
    	protected abstract String getThreadNamePrefix();
    	/**
    	 * 存放延迟消息的ZSET队列名
    	 */
    	protected abstract String getDelayedMsgSourceKey();
    	/**
    	 * 消息到达执行时间点时将其通过指定 exchange 发送到实时消费队列中
    	 */
    	protected abstract String getSendExchange();
    	/**
    	 * 消息到达执行时间点时将其通过指定 routingKey 发送到实时消费队列中
    	 */
    	protected abstract String getSendRoutingKey();
    }
    Copier après la connexion

    在具体业务应用中,只需定义一个继承上述类的bean即可,需要实现的方法主要是提供一些配置,比如该业务对应的zset延时队列名称,同时工作拉取消息的线程数量,由于采用rabbitMq,因此这里需要提供exchangeroutingKey

    实际使用中只需向该zset队列中添加消息,并将score设为该任务需要执行的时间点(此处为13位毫秒时间戳),则到该时间点后,上述组件便会将该消息从zset中取出并删除,再将其通过指定的路由发送到实时MQ消费队列中,由消费者负责执行任务业务逻辑。目前该组件在项目中正常平稳运行。

    注意:

    本文结合项目中的实际需求介绍了延时队列的应用场景,分析了延时队列的多种实现,重点讲述了利用redis实现延时队列的原理,对其实现方案进行比较与优化,并将最终方案实际运用于项目需求中。

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Étiquettes associées:
    source:yisu.com
    Déclaration de ce site Web
    Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
    Tutoriels populaires
    Plus>
    Derniers téléchargements
    Plus>
    effets Web
    Code source du site Web
    Matériel du site Web
    Modèle frontal