Artikel ini membawakan anda pengetahuan yang berkaitan tentang Python, yang terutamanya memperkenalkan isu yang berkaitan dengan pelaksanaan tugas berjadual Anda boleh menggunakan pakej pihak ketiga untuk mengurus tugasan berjadual Secara relatifnya, apscheduler menggunakan Ia lebih mudah untuk gunakan. Mari kita lihat kaedah penggunaannya semoga bermanfaat kepada semua.
[Cadangan berkaitan: Tutorial video Python3]
Mari kita lihat contoh mudah Lihat cara apscheduler digunakan.
#encoding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def sch_test(): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, 测试apscheduler'.format(now)) task = BlockingScheduler() task.add_job(func=sch_test, trigger='cron', second='*/10') task.start()
Contoh di atas sangat mudah kita perlu mentakrifkan objek apscheduler, kemudian add_job untuk menambah tugasan, dan akhirnya memulakan tugas.
Contohnya ialah menjalankan tugasan sch_test setiap 10 saat Keputusan yang dijalankan adalah seperti berikut:
时间:2022-10-08 15:16:30, 测试apscheduler 时间:2022-10-08 15:16:40, 测试apscheduler 时间:2022-10-08 15:16:50, 测试apscheduler 时间:2022-10-08 15:17:00, 测试apscheduler
Jika kita ingin membawa parameter semasa melaksanakan fungsi tugasan, tambahkan sahaja args dalam. fungsi add_job, Contohnya, task.add_job(func=sch_test, args=('a'), trigger='cron', second='*/10').
Dalam contoh di atas, kita mempunyai pemahaman awal tentang cara menggunakan apschedulerl. Seterusnya, kita perlu mengetahui rangka kerja reka bentuk apscheduler. apscheduler mempunyai empat modul utama: pencetus, kedai_kerja, pelaksana dan penjadual.
1. Pencetus:
Pencetus merujuk kepada kaedah pencetus yang ditentukan oleh tugasan. Kita boleh memilih salah satu daripada cron, tarikh dan selang.
Cron mewakili tugas berjadual, serupa dengan linux crontab, yang dicetuskan pada masa tertentu.
Parameter yang tersedia adalah seperti berikut:
Selain itu, kami juga boleh menggunakan jenis ekspresi untuk menetapkan cron. Contohnya, yang biasa digunakan ialah:
Contoh penggunaan, dilaksanakan sekali setiap hari pada 7:20:
task.add_job(func=sch_test, args =( 'Tugas bermasa',), trigger='cron',
hour='7', minute='20')
tarikh mewakili tugasan sekali khusus untuk sesuatu masa;
Contoh penggunaan:
# 使用run_date指定运行时间 task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30)) # 或者用next_run_time task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
selang mewakili tugas kitaran, menentukan selang dan melaksanakannya sekali setiap selang.
selang boleh menetapkan parameter berikut:
Contoh penggunaan, laksanakan tugas sch_test setiap 3 saat:
task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)。
Mari kita ambil contoh untuk menggunakan ketiga-tiga pencetus:
# encoding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, {}测试apscheduler'.format(now, job_type)) task = BlockingScheduler() task.add_job(func=sch_test, args=('一次性任务',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3)) task.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5') task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3) task.start()
Cetak beberapa hasil:
时间:2022-10-08 15:45:49, 一次性任务测试apscheduler 时间:2022-10-08 15:45:49, 循环任务测试apscheduler 时间:2022-10-08 15:45:50, 定时任务测试apscheduler 时间:2022-10-08 15:45:52, 循环任务测试apscheduler 时间:2022-10-08 15:45:55, 定时任务测试apscheduler 时间:2022-10-08 15:45:55, 循环任务测试apscheduler 时间:2022-10-08 15:45:58, 循环任务测试apscheduler
Melalui contoh kod dan paparan hasil, kita boleh mengetahui dengan jelas fungsi pencetus yang berbeza .
2. Memori tugasan job_stores
Seperti namanya, memori tugasan adalah tempat tugasan disimpan secara lalai . Kami juga boleh menyesuaikan kaedah penyimpanan, seperti menyimpan tugas dalam mysql. Terdapat beberapa pilihan di sini:
Biasanya lalai adalah untuk menyimpannya dalam memori, tetapi jika program gagal dan dimulakan semula, tugas akan ditarik dan dijalankan semula keperluan pelaksanaan adalah tinggi, anda boleh memilih kenangan lain.
Contoh penggunaan storan SQLAlchemyJobStore:
from apscheduler.schedulers.blocking import BlockingScheduler def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, {}测试apscheduler'.format(now, job_type)) sched = BlockingScheduler() # 使用mysql存储任务 sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8' sched.add_jobstore('sqlalchemy',url=sql_url) # 添加任务 sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5') sched.start()
3. pindahkan tugas Letakkannya dalam kumpulan benang atau kumpulan proses untuk dijalankan. Terdapat beberapa pilihan:
Lainnya ialah ThreadPoolExecutor, dan yang biasa digunakan ialah pelaksana kumpulan benang dan proses. Jika aplikasi adalah operasi intensif CPU, ProcessPoolExecutor boleh digunakan untuk melaksanakannya.4. Penjadual penjadual
Penjadual tergolong dalam teras apscheduler Ia memainkan peranan untuk menyelaraskan keseluruhan sistem apscheduler, termasuk memori dan pelaksana , pencetus berjalan seperti biasa di bawah jadualnya. Terdapat beberapa penjadual:
Dalam tiada senario khusus, penjadual yang paling biasa digunakan ialah BlockingScheduler.Pemantauan pengecualian
Apabila ralat berlaku semasa tugas yang dijadualkan berjalan, mekanisme pemantauan perlu disediakan modul log untuk merekod maklumat ralat. Contoh penggunaan:
penggunaan enkapsulasi penjadualfrom apscheduler.schedulers.blocking import BlockingScheduler import datetime from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR import logging # logging日志配置打印格式及保存位置 logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='sche.log', filemode='a') def log_listen(event): if event.exception : print ( '任务出错,报错信息:{}'.format(event.exception)) else: print ( '任务正常运行...' ) def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, {}测试apscheduler'.format(now, job_type)) print(1/0) sched = BlockingScheduler() # 使用mysql存储任务 sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8' sched.add_jobstore('sqlalchemy',url=sql_url) # 添加任务 sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5') # 配置任务执行完成及错误时的监听 sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) # 配置日志监听 sched._logger = logging sched.start()
上面介绍了apscheduler框架的主要模块,我们基本能掌握怎样使用apscheduler了。下面就来封装一下apscheduler吧,以后要用直接在这份代码上修改就行了。 【相关推荐:Python3视频教程 】 Atas ialah kandungan terperinci Analisis terperinci pelaksanaan Python bagi tugas berjadual apscheduler. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
import logging.handlers
import os
import datetime
class LoggerUtils():
def init_logger(self, logger_name):
# 日志格式
formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
log_obj = logging.getLogger(logger_name)
log_obj.setLevel(logging.INFO)
# 设置log存储位置
path = '/data/logs/'
filename = '{}{}.log'.format(path, logger_name)
if not os.path.exists(path):
os.makedirs(path)
# 设置日志按照时间分割
timeHandler = logging.handlers.TimedRotatingFileHandler(
filename,
when='D', # 按照什么维度切割, S:秒,M:分,H:小时,D:天,W:周
interval=1, # 多少天切割一次
backupCount=10 # 保留几天
)
timeHandler.setLevel(logging.INFO)
timeHandler.setFormatter(formatter)
log_obj.addHandler(timeHandler)
return log_obj
class Scheduler(LoggerUtils):
def __init__(self):
# 执行器设置
executors = {
'default': ThreadPoolExecutor(10), # 设置一个名为“default”的ThreadPoolExecutor,其worker值为10
'processpool': ProcessPoolExecutor(5) # 设置一个名为“processpool”的ProcessPoolExecutor,其worker值为5
}
self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors)
# 存储器设置
# 这里使用sqlalchemy存储器,将任务存储在mysql
sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
self.scheduler.add_jobstore('sqlalchemy',url=sql_url)
def log_listen(event):
if event.exception:
# 日志记录
self.scheduler._logger.error(event.traceback)
# 配置任务执行完成及错误时的监听
self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 配置日志监听
self.scheduler._logger = self.init_logger('sche_test')
def add_job(self, *args, **kwargs):
"""添加任务"""
self.scheduler.add_job(*args, **kwargs)
def start(self):
"""开启任务"""
self.scheduler.start()
# 测试任务
def sch_test(job_type):
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('时间:{}, {}测试apscheduler'.format(now, job_type))
print(1/0)
# 添加任务,开启任务
sched = Scheduler()
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
# 开启任务
sched.start()