Redis est un excellent système de stockage clé-valeur qui a de nombreuses autres utilisations en plus d'être utilisé comme cache. L'un d'eux est un outil de mise en œuvre pour les tâches planifiées distribuées. Dans cet article, nous présenterons comment utiliser Redis pour implémenter des tâches planifiées distribuées et fournirons des exemples de code correspondants.
Dans un environnement autonome, nous pouvons utiliser des tâches planifiées pour exécuter régulièrement une certaine fonction ou tâche. Dans un environnement distribué, chaque nœud aura ses propres tâches planifiées et des problèmes tels qu'une exécution répétée et une exécution manquée peuvent survenir. Par conséquent, les tâches planifiées distribuées doivent prendre en compte des problèmes tels que la fiabilité de l’exécution des tâches, la répartition et la coordination des tâches.
Redis fournit des structures de données et des commandes qui peuvent bien prendre en charge les tâches planifiées distribuées, telles que :
Ensuite, nous présenterons comment utiliser Redis pour implémenter des tâches planifiées distribuées et fournirons des exemples de code.
Tout d'abord, nous devons stocker les informations sur les tâches dans l'ensemble trié de Redis. Ici, nous pouvons avoir l'heure d'exécution de la tâche (horodatage) comme score et l'ID de la tâche en tant que membre. Voici un exemple de code :
import redis # Connect to Redis redis_conn = redis.Redis(host='localhost', port=6379, db=0) # Add task to Sorted Set task_id = "task_001" execute_time = 1600000000 # timestamp (in seconds) redis_conn.zadd("tasks", {task_id: execute_time})
Dans le code ci-dessus, nous avons exécuté une tâche nommée task_001
, et le temps d'exécution était 1600000000
(ici exprimé par timestamp , en fait cela peut aussi s'exprimer d'autres manières). Stockez-le dans un ensemble trié nommé tâches
. task_001
的任务,执行时间为1600000000
(这里是用时间戳来表示的,实际上也可以使用其他方式来表示)。将它存入名为tasks
的Sorted Set中。
为了避免过期任务一直存在Redis中占用空间,我们需要设置过期时间,并在过期后从Sorted Set中删除。下面是一个示例代码:
import time # Check for expired tasks every 10 seconds while True: # Get all tasks with score less than current time tasks = redis_conn.zrangebyscore("tasks", 0, int(time.time())) # Delete expired tasks for task in tasks: redis_conn.zrem("tasks", task)
以上代码中,我们每隔10秒检查一次过期任务并删除。为此,我们使用了zrangebyscore
命令,获取分数在0
(即当前时间) 至 time.time()
(当前时间戳)之间的任务。在获取到任务后,我们使用了zrem
命令,从Sorted set中删除任务。
在检查过期任务时,我们同时也要执行这些过期任务。下面是一个示例代码:
import uuid # Consume tasks every 10 seconds while True: # Get all tasks with score less than current time tasks = redis_conn.zrangebyscore("tasks", 0, int(time.time())) # Execute tasks for task in tasks: # Check if task is already being executed by another worker lock_id = redis_conn.get("lock_" + task) if lock_id is None: # Lock task using Lua script lock_id = str(uuid.uuid4()) lua_script = """ if redis.call("get", ARGV[1]) == false then redis.call("set", ARGV[1], ARGV[2]) redis.call("expire", ARGV[1], 60) return true else return false end """ if redis_conn.eval(lua_script, 0, "lock_" + task, lock_id) is True: # Execute task print("Executing task " + task) # task.execute() # ... # Remove task from Sorted Set and unlock redis_conn.zrem("tasks", task) redis_conn.delete("lock_" + task)
以上代码中,我们每隔10秒检查一次过期任务并执行。为此,我们使用了zrangebyscore
命令,获取分数在0
(即当前时间) 至 time.time()
rrreee
Dans le code ci-dessus, nous vérifions les tâches expirées toutes les 10 secondes et les supprimons. Pour ce faire, nous utilisons la commandezrangebyscore
pour obtenir le score entre 0
(c'est-à-dire l'heure actuelle) et time.time()
(l'heure actuelle). horodatage actuel) entre les tâches. Après avoir obtenu la tâche, nous avons utilisé la commande zrem
pour supprimer la tâche de l'ensemble trié. 🎜🎜3. Exécuter des tâches🎜🎜Lors de la vérification des tâches expirées, nous devons également exécuter ces tâches expirées en même temps. Voici un exemple de code : 🎜rrreee🎜Dans le code ci-dessus, nous vérifions les tâches expirées toutes les 10 secondes et les exécutons. Pour ce faire, nous utilisons la commande zrangebyscore
pour obtenir le score entre 0
(c'est-à-dire l'heure actuelle) et time.time()
(l'heure actuelle). horodatage actuel) entre les tâches. Après avoir obtenu la tâche, nous vérifions d'abord si la tâche est exécutée par un autre processus. Afin d'éviter que plusieurs processus exécutent la même tâche en même temps, nous utilisons un lock_id pour identifier si la tâche a été verrouillée. Si la tâche n'est pas verrouillée, nous utilisons un script Lua pour acquérir le verrou. Après avoir acquis le verrou, nous effectuons l'opération de tâche correspondante, supprimons la tâche de l'ensemble trié et enfin libérons le verrou. 🎜🎜Résumé🎜🎜Cet article explique comment utiliser Redis pour implémenter des tâches planifiées distribuées et fournit des exemples de code correspondants. En utilisant les fonctions Redis telles que Sorted Set, la commande expire et le script Lua, nous pouvons implémenter un système de tâches planifiées distribuées hautement fiable et efficace. Bien entendu, le code ci-dessus doit encore être amélioré et optimisé pour répondre aux différents besoins et scénarios. 🎜Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!