Comment Redis implémente la file d'attente différée
Utiliser
Configuration des dépendances
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.12.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.homeey</groupId> <artifactId>redis-delay-queue</artifactId> <version>0.0.1-SNAPSHOT</version> <name>redis-delay-queue</name> <description>redis-delay-queue</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.redisson/redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.19.3</version> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-data-23</artifactId> <version>3.19.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
Remarque : Gestion des problèmes de compatibilité avec Redisson et Springboot# 🎜 🎜#
Fichier de configurationIl existe trois façons d'intégrer Redisson avec Springboot- La première : configuration générale de Redis + Redisson Configuration automatique [la plus simple]
- Deuxième : utilisez un fichier de configuration Redisson séparé
- Troisième type : utilisez le Clé de configuration spring.redis.redisson à configurer
spring: redis: database: 0 host: localhost port: 6379 timeout: 10000 lettuce: pool: max-active: 8 max-wait: -1 min-idle: 0 max-idle: 8
package com.homeey.redisdelayqueue.delay; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 明天的你会因今天到的努力而幸运 * * @author jt4mrg@qq.com * 23:11 2023-02-19 2023 **/ @Slf4j @Component @RequiredArgsConstructor public class RedissonDelayQueue { private final RDelayedQueue<String> delayedQueue; private final RBlockingQueue<String> blockingQueue; @PostConstruct public void init() { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.submit(() -> { while (true) { try { String task = blockingQueue.take(); log.info("rev delay task:{}", task); } catch (Exception e) { log.error("occur error", e); } } }); } public void offerTask(String task, long seconds) { log.info("add delay task:{},delay time:{}s", task, seconds); delayedQueue.offer(task, seconds, TimeUnit.SECONDS); } @Configuration static class RedissonDelayQueueConfigure { @Bean public RBlockingQueue<String> blockingQueue(RedissonClient redissonClient) { return redissonClient.getBlockingQueue("TOKEN-RENEWAL"); } @Bean public RDelayedQueue<String> delayedQueue(RBlockingQueue<String> blockingQueue, RedissonClient redissonClient) { return redissonClient.getDelayedQueue(blockingQueue); } } }
Effet d'exécution
Analyse des principes
Implémenté à partir de RedissonDelayedQueue
Nous voyons quatre caractères
# 🎜🎜#
RedissonDelayedQueue
实现中我们看到有四个角色
redisson_delay_queue_timeout:xxx,sorted set数据类型,存放所有延迟任务,按照延迟任务的到期时间戳(提交任务时的时间戳 + 延迟时间)来排序的,所以列表的最前面的第一个元素就是整个延迟队列中最早要被执行的任务,这个概念很重要
redisson_delay_queue:xxx,list数据类型,暂时没发现什么用,只是在提交任务时会写入这里面,队列转移时又会删除里面的元素
xxx:list数据类型,被称为目标队列,这个里面存放的任务都是已经到了延迟时间的,可以被消费者获取的任务,所以上面demo中的RBlockingQueue的take方法是从这个目标队列中获取到任务的
redisson_delay_queue_channel:xxx,是一个channel,用来通知客户端开启一个延迟任务
队列创建
RedissonDelayedQueue
延迟队列创建时,指定了队列转移服务,以及实现延迟队列的四个重要校色的key。核心代码是指定队列转移任务
QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) { @Override protected RFuture<Long> pushTaskAsync() { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "//拿到zset中过期的值列表 + "if #expiredValues > 0 then " //如果有 + "for i, v in ipairs(expiredValues) do " + "local randomId, value = struct.unpack('dLc0', v);"//解构消息,在提交任务时打包的消息 + "redis.call('rpush', KEYS[1], value);" //放入无前缀的list 队头 + "redis.call('lrem', KEYS[3], 1, v);"//移除带前缀list 队尾元素 + "end; " + "redis.call('zrem', KEYS[2], unpack(expiredValues));" //移除zset中本次读取的过期元素 + "end; " // get startTime from scheduler queue head task + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "//取zset最小分值的元素 + "if v[1] ~= nil then " + "return v[2]; " //返回分值,即过期时间 + "end " + "return nil;", Arrays.asList(getRawName(), timeoutSetName, queueName), System.currentTimeMillis(), 100); } @Override protected RTopic getTopic() { return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName); } };
生产者
核心代码RedissonDelayedQueue#offerAsync
redisson_delay_queue_timeout:xxx, type de données défini trié, stocke toutes les tâches retardées en fonction de l'heure d'expiration des tâches retardées Triées par timbre (horodatage lorsque soumission de la tâche + délai), le premier élément en début de liste est donc la première tâche à être exécutée dans toute la file d'attente. Ce concept est très important
#🎜. 🎜#redisson_delay_queue:xxx, type de données liste, je n'ai trouvé aucune utilité pour le moment, mais il sera écrit ici lors de la soumission de la tâche, et les éléments à l'intérieur seront supprimés lorsque la file d'attente sera transféré# 🎜🎜##🎜🎜##🎜🎜##🎜🎜#xxx : type de données de liste, appelé file d'attente cible. Les tâches stockées dans cette file d'attente ont atteint le délai et peuvent être consommées. . La tâche obtenue par l'utilisateur, donc la méthode take de RBlockingQueue dans la démo ci-dessus obtient la tâche de cette file d'attente cible #🎜🎜##🎜🎜##🎜🎜##🎜🎜#redisson_delay_queue_channel:xxx , est un canal utilisé pour notifier au client de démarrer une tâche retardée#🎜🎜##🎜🎜##🎜🎜##🎜🎜#Queue Creation#🎜🎜##🎜🎜#
RedissonDelayedQueue
Delay file d'attente Lors de la création, le service de transfert de file d'attente et les quatre clés de correction des couleurs importantes pour la mise en œuvre de la file d'attente retardée sont spécifiés. Le code principal consiste à spécifier la tâche de transfert de file d'attente#🎜🎜#return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID, "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" //打包消息体:消息id,消息长度,消息值 + "redis.call('zadd', KEYS[2], ARGV[1], value);"//zset中加入消息及其超时分值 + "redis.call('rpush', KEYS[3], value);" //向带前缀的list中添加消息 // if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); "//取出zset中第一个元素 + "if v[1] == value then " //如果最快过期的元素就是这次发送的消息 + "redis.call('publish', KEYS[4], ARGV[1]); " //channel中发布一下超时时间 + "end;", Arrays.asList(getRawName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e));

RedissonDelayedQueue#offerAsync
#🎜🎜#rrreee#🎜🎜#consumer#🎜 🎜 ##🎜🎜#Le moyen le plus simple pour les consommateurs est de simplement lire BLPOP dans la liste sans préfixe#🎜🎜#Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Outils d'IA chauds

Undresser.AI Undress
Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover
Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool
Images de déshabillage gratuites

Clothoff.io
Dissolvant de vêtements AI

AI Hentai Generator
Générez AI Hentai gratuitement.

Article chaud

Outils chauds

Bloc-notes++7.3.1
Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise
Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1
Puissant environnement de développement intégré PHP

Dreamweaver CS6
Outils de développement Web visuel

SublimeText3 version Mac
Logiciel d'édition de code au niveau de Dieu (SublimeText3)

Le mode Redis Cluster déploie les instances Redis sur plusieurs serveurs grâce à la rupture, à l'amélioration de l'évolutivité et de la disponibilité. Les étapes de construction sont les suivantes: Créez des instances de redis étranges avec différents ports; Créer 3 instances Sentinel, Moniteur Redis Instances et basculement; Configurer les fichiers de configuration Sentinel, ajouter des informations d'instance Redis de surveillance et des paramètres de basculement; Configurer les fichiers de configuration d'instance Redis, activer le mode de cluster et spécifier le chemin du fichier d'informations de cluster; Créer un fichier nœuds.conf, contenant des informations de chaque instance redis; Démarrez le cluster, exécutez la commande CREATE pour créer un cluster et spécifiez le nombre de répliques; Connectez-vous au cluster pour exécuter la commande d'informations de cluster pour vérifier l'état du cluster; faire

L'utilisation de la directive Redis nécessite les étapes suivantes: Ouvrez le client Redis. Entrez la commande (Verbe Key Value). Fournit les paramètres requis (varie de l'instruction à l'instruction). Appuyez sur Entrée pour exécuter la commande. Redis renvoie une réponse indiquant le résultat de l'opération (généralement OK ou -err).

Les étapes pour démarrer un serveur Redis incluent: Installez Redis en fonction du système d'exploitation. Démarrez le service Redis via Redis-Server (Linux / MacOS) ou Redis-Server.exe (Windows). Utilisez la commande redis-Cli Ping (Linux / MacOS) ou redis-Cli.exe Ping (Windows) pour vérifier l'état du service. Utilisez un client redis, tel que redis-cli, python ou node.js pour accéder au serveur.

L'utilisation des opérations Redis pour verrouiller nécessite l'obtention du verrouillage via la commande setnx, puis en utilisant la commande Expire pour définir le temps d'expiration. Les étapes spécifiques sont les suivantes: (1) Utilisez la commande setnx pour essayer de définir une paire de valeurs de clé; (2) Utilisez la commande Expire pour définir le temps d'expiration du verrou; (3) Utilisez la commande del pour supprimer le verrouillage lorsque le verrouillage n'est plus nécessaire.

Pour lire une file d'attente à partir de Redis, vous devez obtenir le nom de la file d'attente, lire les éléments à l'aide de la commande LPOP et traiter la file d'attente vide. Les étapes spécifiques sont les suivantes: Obtenez le nom de la file d'attente: Nommez-le avec le préfixe de "Fitre:" tel que "Fitre: My-Quyue". Utilisez la commande LPOP: éjectez l'élément de la tête de la file d'attente et renvoyez sa valeur, telle que la file d'attente LPOP: My-Queue. Traitement des files d'attente vides: si la file d'attente est vide, LPOP renvoie NIL et vous pouvez vérifier si la file d'attente existe avant de lire l'élément.

Pour afficher toutes les touches dans Redis, il existe trois façons: utilisez la commande Keys pour retourner toutes les clés qui correspondent au modèle spécifié; Utilisez la commande SCAN pour itérer les touches et renvoyez un ensemble de clés; Utilisez la commande info pour obtenir le nombre total de clés.

Redis utilise des tables de hachage pour stocker les données et prend en charge les structures de données telles que les chaînes, les listes, les tables de hachage, les collections et les collections ordonnées. Redis persiste les données via des instantanés (RDB) et ajoutez les mécanismes d'écriture uniquement (AOF). Redis utilise la réplication maître-esclave pour améliorer la disponibilité des données. Redis utilise une boucle d'événement unique pour gérer les connexions et les commandes pour assurer l'atomicité et la cohérence des données. Redis définit le temps d'expiration de la clé et utilise le mécanisme de suppression paresseux pour supprimer la clé d'expiration.

La meilleure façon de comprendre le code source redis est d'aller étape par étape: familiarisez-vous avec les bases de Redis. Sélectionnez un module ou une fonction spécifique comme point de départ. Commencez par le point d'entrée du module ou de la fonction et affichez le code ligne par ligne. Affichez le code via la chaîne d'appel de fonction. Familiez les structures de données sous-jacentes utilisées par Redis. Identifiez l'algorithme utilisé par Redis.
