Redis is an excellent key-value storage system that has many other uses besides being used as a cache. One of them is as an implementation tool for distributed scheduled tasks. In this article, we will introduce how to use Redis to implement distributed scheduled tasks and provide corresponding code examples.
In a stand-alone environment, we can use scheduled tasks to run a certain function or task regularly. In a distributed environment, each node will have its own scheduled tasks, and problems such as repeated execution and missed execution may occur. Therefore, distributed scheduled tasks need to consider issues such as task execution reliability, task distribution and coordination.
Redis provides some data structures and commands that can well support distributed scheduled tasks, such as:
Next, we will introduce how to use Redis to implement distributed scheduled tasks and provide code examples.
First, we need to store the task information in the Sorted Set of Redis. Here, we can have the task's execution time (timestamp) as the score and the task's ID as the member. The following is a sample code:
import redis # Connect to Redis redis_conn = redis.Redis(host='localhost', port=6379, db=0) # Add task to Sorted Set task_id = "task_001" execute_time = 1600000000 # timestamp (in seconds) redis_conn.zadd("tasks", {task_id: execute_time})
In the above code, we executed a task named task_001
, and the execution time was 1600000000
(here the timestamp is used In fact, it can also be expressed in other ways). Store it in a Sorted Set named tasks
.
In order to prevent expired tasks from always taking up space in Redis, we need to set the expiration time and delete them from the Sorted Set after expiration. The following is a sample code:
import time # Check for expired tasks every 10 seconds while True: # Get all tasks with score less than current time tasks = redis_conn.zrangebyscore("tasks", 0, int(time.time())) # Delete expired tasks for task in tasks: redis_conn.zrem("tasks", task)
In the above code, we check and delete expired tasks every 10 seconds. To do this, we use the zrangebyscore
command to get the score between 0
(that is, the current time) and time.time()
(the current timestamp) Task. After obtaining the task, we used the zrem
command to delete the task from the Sorted set.
When checking expired tasks, we also need to execute these expired tasks at the same time. The following is a sample code:
import uuid # Consume tasks every 10 seconds while True: # Get all tasks with score less than current time tasks = redis_conn.zrangebyscore("tasks", 0, int(time.time())) # Execute tasks for task in tasks: # Check if task is already being executed by another worker lock_id = redis_conn.get("lock_" + task) if lock_id is None: # Lock task using Lua script lock_id = str(uuid.uuid4()) lua_script = """ if redis.call("get", ARGV[1]) == false then redis.call("set", ARGV[1], ARGV[2]) redis.call("expire", ARGV[1], 60) return true else return false end """ if redis_conn.eval(lua_script, 0, "lock_" + task, lock_id) is True: # Execute task print("Executing task " + task) # task.execute() # ... # Remove task from Sorted Set and unlock redis_conn.zrem("tasks", task) redis_conn.delete("lock_" + task)
In the above code, we check and execute the expired task every 10 seconds. To do this, we use the zrangebyscore
command to get the score between 0
(that is, the current time) and time.time()
(the current timestamp) Task. After getting the task, we first check whether the task is being executed by another process. In order to avoid multiple processes executing the same task at the same time, we use a lock_id to identify whether the task has been locked. If the task is not locked, we use a Lua script to acquire the lock. After acquiring the lock, we perform the corresponding task operation, delete the task from the Sorted Set, and finally release the lock.
This article introduces how to use Redis to implement distributed scheduled tasks and provides corresponding code examples. By using Redis functions such as Sorted Set, expire command and Lua script, we can implement a highly reliable and efficient distributed scheduled task system. Of course, the above code still needs to be improved and optimized to meet different needs and scenarios.
The above is the detailed content of How to use Redis to implement distributed scheduled tasks. For more information, please follow other related articles on the PHP Chinese website!