Python怎么用sched模块实现定时任务
牛刀小试
我们先来看下面这个案例,代码如下
import sched import time def say_hello(name): print(f"Hello, world, {name}") scheduler = sched.scheduler(time.time, time.sleep) scheduler.enter(5, 1, say_hello, ("张三", )) scheduler.run()
那么上述的代码中,第一步首先则是实例化一个定时器,通过如下的代码
import sched scheduler = sched.scheduler()
接下来我们通过enter()
方法来执行定时任务的操作,其中的参数分别是延迟的时间、任务的优先级以及具体的执行函数和执行函数中的参数。像如上的代码就会在延迟5秒钟之后执行say_hello()
函数
当然要是延迟的时间相等的时候,我们可以设置任务执行的优先级来指定函数方法运行的顺序,例如有如下的代码
import sched import time def say_hello(name): print(f"Hello, world, {name}") def say_hello_2(name): print(f"Hello, {name}") scheduler = sched.scheduler(time.time, time.sleep) scheduler.enter(5, 2, say_hello, ("张三", )) scheduler.enter(5, 1, say_hello_2, ("李四", )) scheduler.run()
如上述代码,尽管延迟的时间都是一样的,但是say_hello()
方法的优先级明显要比say_hello_2()
方法要低一些,因此后者会优先执行。
进阶使用
除了让函数延迟执行,我们还可以让其重复执行,具体这样来操作,代码如下
import sched import time def say_hello(): print("Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) def repeat_task(): scheduler.enter(5, 1, say_hello, ()) scheduler.enter(5, 1, repeat_task, ()) repeat_task() scheduler.run()
这里我们新建了一个repeat_task()
自定义函数,调用了scheduler.enter()
方法5秒钟执行一次之前定义的say_hello()
函数
在固定时间执行任务
同时我们还可以让任务在指定的时间执行,这里用到scheduler.entertabs()
方法,代码如下
import sched import time def say_hello(): print("Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 指定时间执行任务 specific_time = time.time() + 5 # 距离现在的5秒钟之后执行 scheduler.enterabs(specific_time, 1, say_hello, ()) scheduler.run()
我们传入其中参数使其在指定的时间,也就是距离当下的5秒钟之后来执行任务
执行多个任务
这里仍然是调用enter()
方法来运行多个任务,代码如下
import sched import time def task_one(): print("Task One - Hello, world!") def task_two(): print("Task Two - Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 任务一在两秒钟只有执行 scheduler.enter(2, 1, task_one, ()) # 任务二在五秒钟之后运行 scheduler.enter(5, 1, task_two, ()) scheduler.run()
这里定义了两个函数,task_one
和task_two
里面分是同样的执行逻辑,打印出“Hello, world!”,然后task_one()
是在两秒钟之后执行而task_two()
则是在5秒钟之后执行,两者执行的优先级都是一样的。
以不同的优先级执行不同的任务
这回我们给task_one()
和task_two()
赋予不同的优先级,看一看执行的结果如下
import sched import time def task_one(): print("Task One - Hello, world!") def task_two(): print("Task Two - Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 优先级是1 scheduler.enter(2, 2, task_one, ()) # 优先级是2 scheduler.enter(5, 1, task_two, ()) scheduler.run()
output
Task One - Hello, world!
Task Two - Hello, world!
上述的代码会在停顿两秒之后运行task_one()
函数,再停顿3秒之后执行task_two()
函数
定时任务加上取消方法
我们给定时任务添加上取消的方法,代码如下
import sched import time def task_one(): print("Task One - Hello, world!") def task_two(): print("Task Two - Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 任务一在两秒钟只有执行 task_one_event = scheduler.enter(2, 1, task_one, ()) # 任务二在五秒钟之后运行 task_two_event = scheduler.enter(5, 1, task_two, ()) # 取消执行task_one scheduler.cancel(task_one_event) scheduler.run()
我们将两秒钟之后执行的task_one()
方法给取消掉,最后就只执行了task_two()
方法,也就打印出来“Task Two - Hello, world!”
执行备份程序
我们来写一个备份的脚本,在每天固定的时间将文件备份,代码如下
import sched import time import shutil def backup_files(): source = '路径/files' destination = '路径二' shutil.copytree(source, destination) def schedule_backup(): # 创建新的定时器 scheduler = sched.scheduler(time.time, time.sleep) # 备份程序在每天的1点来执行 backup_time = time.strptime('01:00:00', '%H:%M:%S') backup_event = scheduler.enterabs(time.mktime(backup_time), 1, backup_files, ()) # 开启定时任务 scheduler.run() schedule_backup()
我们通过shutil
模块当中的copytree()
方法来执行拷贝文件,然后在每天的1点准时执行
执行定时分发邮件的程序
最后我们来执行定时分发邮件的程序,代码如下
import sched import time import smtplib from email.mime.text import MIMEText def send_email(subject, message, from_addr, to_addr, smtp_server): # 邮件的主体信息 email = MIMEText(message) email['Subject'] = subject email['From'] = from_addr email['To'] = to_addr # 发邮件 with smtplib.SMTP(smtp_server) as server: server.send_message(email) def send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time): # 创建定时任务的示例 scheduler = sched.scheduler(time.time, time.sleep) # 定时邮件 scheduler.enterabs(scheduled_time, 1, send_email, argument=(subject, message, from_addr, to_addr, smtp_server)) # 开启定时器 scheduler.run() subject = 'Test Email' message = 'This is a test email' from_addr = 'test@example.com' to_addr = 'test@example.com' smtp_server = 'smtp.test.com' scheduled_time = time.time() + 60 # 一分钟之后执行程序 send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time)
以上是Python怎么用sched模块实现定时任务的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

热门话题

MySQL 有免费的社区版和收费的企业版。社区版可免费使用和修改,但支持有限,适合稳定性要求不高、技术能力强的应用。企业版提供全面商业支持,适合需要稳定可靠、高性能数据库且愿意为支持买单的应用。选择版本时考虑的因素包括应用关键性、预算和技术技能。没有完美的选项,只有最合适的方案,需根据具体情况谨慎选择。

文章介绍了MySQL数据库的上手操作。首先,需安装MySQL客户端,如MySQLWorkbench或命令行客户端。1.使用mysql-uroot-p命令连接服务器,并使用root账户密码登录;2.使用CREATEDATABASE创建数据库,USE选择数据库;3.使用CREATETABLE创建表,定义字段及数据类型;4.使用INSERTINTO插入数据,SELECT查询数据,UPDATE更新数据,DELETE删除数据。熟练掌握这些步骤,并学习处理常见问题和优化数据库性能,才能高效使用MySQL。

MySQL安装失败的原因主要有:1.权限问题,需以管理员身份运行或使用sudo命令;2.依赖项缺失,需安装相关开发包;3.端口冲突,需关闭占用3306端口的程序或修改配置文件;4.安装包损坏,需重新下载并验证完整性;5.环境变量配置错误,需根据操作系统正确配置环境变量。解决这些问题,仔细检查每个步骤,就能顺利安装MySQL。

MySQL数据库性能优化指南在资源密集型应用中,MySQL数据库扮演着至关重要的角色,负责管理海量事务。然而,随着应用规模的扩大,数据库性能瓶颈往往成为制约因素。本文将探讨一系列行之有效的MySQL性能优化策略,确保您的应用在高负载下依然保持高效响应。我们将结合实际案例,深入讲解索引、查询优化、数据库设计以及缓存等关键技术。1.数据库架构设计优化合理的数据库架构是MySQL性能优化的基石。以下是一些核心原则:选择合适的数据类型选择最小的、符合需求的数据类型,既能节省存储空间,又能提升数据处理速度

MySQL性能优化需从安装配置、索引及查询优化、监控与调优三个方面入手。1.安装后需根据服务器配置调整my.cnf文件,例如innodb_buffer_pool_size参数,并关闭query_cache_size;2.创建合适的索引,避免索引过多,并优化查询语句,例如使用EXPLAIN命令分析执行计划;3.利用MySQL自带监控工具(SHOWPROCESSLIST,SHOWSTATUS)监控数据库运行状况,定期备份和整理数据库。通过这些步骤,持续优化,才能提升MySQL数据库性能。

MySQL 可在无需网络连接的情况下运行,进行基本的数据存储和管理。但是,对于与其他系统交互、远程访问或使用高级功能(如复制和集群)的情况,则需要网络连接。此外,安全措施(如防火墙)、性能优化(选择合适的网络连接)和数据备份对于连接到互联网的 MySQL 数据库至关重要。

直接通过 Navicat 查看 MongoDB 密码是不可能的,因为它以哈希值形式存储。取回丢失密码的方法:1. 重置密码;2. 检查配置文件(可能包含哈希值);3. 检查代码(可能硬编码密码)。

HadiDB:轻量级、高水平可扩展的Python数据库HadiDB(hadidb)是一个用Python编写的轻量级数据库,具备高度水平的可扩展性。安装HadiDB使用pip安装:pipinstallhadidb用户管理创建用户:createuser()方法创建一个新用户。authentication()方法验证用户身份。fromhadidb.operationimportuseruser_obj=user("admin","admin")user_obj.
