Home > Backend Development > Python Tutorial > How to use sched module in Python to implement scheduled tasks

How to use sched module in Python to implement scheduled tasks

WBOY
Release: 2023-04-18 14:38:17
forward
2229 people have browsed it

A little test

Let’s take a look at the following case first. The code is as follows

import sched
import time

def say_hello(name):
    print(f"Hello, world, {name}")

scheduler = sched.scheduler(time.time, time.sleep)

scheduler.enter(5, 1, say_hello, ("张三", ))
scheduler.run()
Copy after login

The first step in the above code is to instantiate a timer through the following code

import sched

scheduler = sched.scheduler()
Copy after login

Next we use the enter() method to perform the scheduled task operation. The parameters are the delay time, the priority of the task, the specific execution function and the execution function. parameter. Code like the above will be executed after a delay of 5 seconds say_hello()Function

Of course, if the delay times are equal, we can set the priority of task execution to specify the function method The running sequence, for example, has the following code

import sched
import time

def say_hello(name):
    print(f"Hello, world, {name}")

def say_hello_2(name):
    print(f"Hello, {name}")

scheduler = sched.scheduler(time.time, time.sleep)

scheduler.enter(5, 2, say_hello, ("张三", ))
scheduler.enter(5, 1, say_hello_2, ("李四", ))
scheduler.run()
Copy after login

Like the above code, although the delay time is the same, the priority of the say_hello() method is obviously higher than say_hello_2 ()The method is lower, so the latter will be executed first.

Advanced use

In addition to delaying the execution of the function, we can also make it execute repeatedly. Specifically, the code is as follows

import sched
import time

def say_hello():
    print("Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

def repeat_task():
    scheduler.enter(5, 1, say_hello, ())
    scheduler.enter(5, 1, repeat_task, ())

repeat_task()
scheduler.run()
Copy after login

Here we have created a new onerepeat_task() Custom function, calling scheduler.enter() method executes the previously defined say_hello() function

once every 5 seconds Execute tasks at a fixed time

At the same time, we can also let the task be executed at a specified time. Here we use the scheduler.entertabs() method. The code is as follows

import sched
import time

def say_hello():
    print("Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 指定时间执行任务
specific_time = time.time() + 5  # 距离现在的5秒钟之后执行
scheduler.enterabs(specific_time, 1, say_hello, ())

scheduler.run()
Copy after login

We pass in The parameters allow it to execute the task at the specified time, that is, 5 seconds from now.

Perform multiple tasks

Here we still call enter() Method to run multiple tasks, the code is as follows

import sched
import time

def task_one():
    print("Task One - Hello, world!")
    
def task_two():
    print("Task Two - Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 任务一在两秒钟只有执行
scheduler.enter(2, 1, task_one, ())

# 任务二在五秒钟之后运行
scheduler.enter(5, 1, task_two, ())

scheduler.run()
Copy after login

Two functions are defined here, task_one and task_two have the same execution logic, and print out "Hello , world!", then task_one() is executed after two seconds and task_two() is executed after 5 seconds. The priority of both executions is the same. of.

Execute different tasks with different priorities

This time we give different priorities to task_one() and task_two(), Take a look at the execution results as follows

import sched
import time

def task_one():
    print("Task One - Hello, world!")
    
def task_two():
    print("Task Two - Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 优先级是1
scheduler.enter(2, 2, task_one, ())

# 优先级是2
scheduler.enter(5, 1, task_two, ())

scheduler.run()
Copy after login

output

Task One - Hello, world!
Task Two - Hello, world!

The above code will run the task_one() function after a pause of two seconds, and then execute the task_two() function after a pause of 3 seconds.

Scheduled tasks plus cancellation method

We add a cancellation method to the scheduled task, the code is as follows

import sched
import time

def task_one():
    print("Task One - Hello, world!")
    
def task_two():
    print("Task Two - Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 任务一在两秒钟只有执行
task_one_event = scheduler.enter(2, 1, task_one, ())

# 任务二在五秒钟之后运行
task_two_event = scheduler.enter(5, 1, task_two, ())

# 取消执行task_one
scheduler.cancel(task_one_event)

scheduler.run()
Copy after login

We cancel the task_one() method that will be executed after two seconds, and finally only execute it The task_two() method will print out "Task Two - Hello, world!"

Execute the backup program

Let's write a backup script every day Back up the file at a fixed time. The code is as follows

import sched
import time
import shutil

def backup_files():
    source = '路径/files'
    destination = '路径二'
    shutil.copytree(source, destination)

def schedule_backup():
    # 创建新的定时器
    scheduler = sched.scheduler(time.time, time.sleep)

    # 备份程序在每天的1点来执行
    backup_time = time.strptime('01:00:00', '%H:%M:%S')
    backup_event = scheduler.enterabs(time.mktime(backup_time), 1, backup_files, ())

    # 开启定时任务
    scheduler.run()

schedule_backup()
Copy after login

We use the copytree() method in the shutil module to copy the file, and then at 1 o'clock every day on time Execute

Execute the program for regularly distributing emails

Finally, we will execute the program for regularly distributing emails. The code is as follows

import sched
import time
import smtplib
from email.mime.text import MIMEText

def send_email(subject, message, from_addr, to_addr, smtp_server):
    # 邮件的主体信息
    email = MIMEText(message)
    email['Subject'] = subject
    email['From'] = from_addr
    email['To'] = to_addr

    # 发邮件
    with smtplib.SMTP(smtp_server) as server:
        server.send_message(email)

def send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time):
    # 创建定时任务的示例
    scheduler = sched.scheduler(time.time, time.sleep)

    # 定时邮件
    scheduler.enterabs(scheduled_time, 1, send_email, argument=(subject, message, from_addr, to_addr, smtp_server))

    # 开启定时器
    scheduler.run()

subject = 'Test Email'
message = 'This is a test email'
from_addr = 'test@example.com'
to_addr = 'test@example.com'
smtp_server = 'smtp.test.com'

scheduled_time = time.time() + 60 # 一分钟之后执行程序
send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time)
Copy after login

The above is the detailed content of How to use sched module in Python to implement scheduled tasks. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:yisu.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template