이 글에서는 예약된 작업 구현과 관련된 문제를 주로 소개하는 Python에 대한 관련 지식을 제공합니다. 타사 패키지를 사용하여 예약된 작업을 관리하는 것이 상대적으로 더 쉽습니다. 사용된 방법을 살펴보는 것이 모든 사람에게 도움이 되기를 바랍니다.
【관련 추천: Python3 동영상 튜토리얼】
apscheduler가 어떻게 사용되는지 간단한 예를 들어 보겠습니다.
#encoding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def sch_test(): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, 测试apscheduler'.format(now)) task = BlockingScheduler() task.add_job(func=sch_test, trigger='cron', second='*/10') task.start()
위의 예는 매우 간단합니다. 먼저 apscheduler 개체를 정의한 다음 add_job을 사용하여 작업을 추가하고 마지막으로 작업을 시작해야 합니다.
예제는 sch_test 작업을 10초마다 실행하는 것입니다. 실행 결과는 다음과 같습니다.
时间:2022-10-08 15:16:30, 测试apscheduler 时间:2022-10-08 15:16:40, 测试apscheduler 时间:2022-10-08 15:16:50, 测试apscheduler 时间:2022-10-08 15:17:00, 测试apscheduler
작업 함수를 실행할 때 매개변수를 전달하려면 task.add_job(과 같이 add_job 함수에 args를 추가하면 됩니다. func=sch_test, args =('a'), 트리거='cron', 두 번째='*/10').
위의 예에서 apschedulerl을 사용하는 방법에 대한 사전 이해가 있었습니다. 다음으로 apscheduler의 설계 프레임워크를 알아야 합니다. apscheduler에는 트리거, job_stores, 실행기 및 스케줄러의 네 가지 주요 모듈이 있습니다.
1. 트리거:
트리거는 작업에서 지정한 트리거 방법을 참조하며, 예제에서는 "cron" 방법을 사용합니다. cron, 날짜, 간격 중 하나를 선택할 수 있습니다.
Cron은 Linux crontab과 유사하게 지정된 시간에 트리거되는 예약된 작업을 나타냅니다.
사용 가능한 매개변수는 다음과 같습니다.
또한 표현식 유형을 사용하여 cron을 설정할 수도 있습니다. 예를 들어, 일반적으로 사용되는 것은 다음과 같습니다:
사용 예, 매일 7시 20분에 한 번 실행:
task.add_job(func=sch_test, args=('Schedule task',), Trigger='cron' ,
hour='7', Minute='20')
date는 특정 시간에 특정한 일회성 작업을 나타냅니다.
사용 예:
# 使用run_date指定运行时间 task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30)) # 或者用next_run_time task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
interval은 간격 시간을 지정하는 주기적 작업을 나타냅니다. time 간격마다 한 번씩 실행됩니다.
interval은 다음 매개변수를 설정할 수 있습니다.
사용 예, 3초마다 sch_test 작업 실행:
task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)。
세 가지 트리거를 모두 사용하는 예를 들어보겠습니다.
# encoding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, {}测试apscheduler'.format(now, job_type)) task = BlockingScheduler() task.add_job(func=sch_test, args=('一次性任务',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3)) task.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5') task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3) task.start()
일부 결과 인쇄:
时间:2022-10-08 15:45:49, 一次性任务测试apscheduler 时间:2022-10-08 15:45:49, 循环任务测试apscheduler 时间:2022-10-08 15:45:50, 定时任务测试apscheduler 时间:2022-10-08 15:45:52, 循环任务测试apscheduler 时间:2022-10-08 15:45:55, 定时任务测试apscheduler 时间:2022-10-08 15:45:55, 循环任务测试apscheduler 时间:2022-10-08 15:45:58, 循环任务测试apscheduler
코드를 통해 예제와 결과 표시를 통해 다양한 트리거 사용의 차이점을 명확하게 이해할 수 있습니다.
2. 태스크 메모리 job_stores
이름에서 알 수 있듯이 태스크 메모리는 태스크가 저장되는 곳으로 기본적으로 메모리에 저장됩니다. mysql에 작업을 저장하는 등 저장 방법을 사용자 정의할 수도 있습니다. 여기에는 몇 가지 옵션이 있습니다.
일반적으로 기본값은 메모리에 저장하는 것이지만 프로그램이 실패하고 다시 시작되면 작업을 가져와서 다시 실행합니다. 작업 실행에 대한 요구 사항이 높으면 다음을 수행할 수 있습니다. 다른 저장소를 선택하세요.
SQLAlchemyJobStore 저장소 사용 예:
from apscheduler.schedulers.blocking import BlockingScheduler def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, {}测试apscheduler'.format(now, job_type)) sched = BlockingScheduler() # 使用mysql存储任务 sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8' sched.add_jobstore('sqlalchemy',url=sql_url) # 添加任务 sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5') sched.start()
3. 실행자 executor
실행자의 기능은 실행할 작업을 스레드 풀이나 프로세스 풀에 넣는 것입니다. 여러 가지 옵션이 있습니다:
기본값은 ThreadPoolExecutor이고 일반적으로 사용되는 옵션은 스레드 및 프로세스 풀 실행기입니다. 애플리케이션이 CPU 집약적인 작업인 경우 ProcessPoolExecutor를 사용하여 실행할 수 있습니다.
4. Schedulers 스케줄러
스케줄러는 apscheduler 시스템 전체를 조정하는 역할을 합니다. 메모리, 실행기, 트리거는 해당 스케줄링에 따라 정상적으로 실행됩니다. 여러 가지 스케줄러가 있습니다.
특정 시나리오가 아닌 가장 일반적으로 사용되는 스케줄러는 BlockingScheduler입니다.
예외 모니터링
예약된 작업이 실행 중일 때 오류가 발생하면 일반적으로 로깅 모듈을 사용하여 오류 정보를 기록해야 합니다.
사용 예:
from apscheduler.schedulers.blocking import BlockingScheduler import datetime from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR import logging # logging日志配置打印格式及保存位置 logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='sche.log', filemode='a') def log_listen(event): if event.exception : print ( '任务出错,报错信息:{}'.format(event.exception)) else: print ( '任务正常运行...' ) def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, {}测试apscheduler'.format(now, job_type)) print(1/0) sched = BlockingScheduler() # 使用mysql存储任务 sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8' sched.add_jobstore('sqlalchemy',url=sql_url) # 添加任务 sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5') # 配置任务执行完成及错误时的监听 sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) # 配置日志监听 sched._logger = logging sched.start()
apscheduler 패키지 사용
上面介绍了apscheduler框架的主要模块,我们基本能掌握怎样使用apscheduler了。下面就来封装一下apscheduler吧,以后要用直接在这份代码上修改就行了。
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR import logging import logging.handlers import os import datetime class LoggerUtils(): def init_logger(self, logger_name): # 日志格式 formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s') log_obj = logging.getLogger(logger_name) log_obj.setLevel(logging.INFO) # 设置log存储位置 path = '/data/logs/' filename = '{}{}.log'.format(path, logger_name) if not os.path.exists(path): os.makedirs(path) # 设置日志按照时间分割 timeHandler = logging.handlers.TimedRotatingFileHandler( filename, when='D', # 按照什么维度切割, S:秒,M:分,H:小时,D:天,W:周 interval=1, # 多少天切割一次 backupCount=10 # 保留几天 ) timeHandler.setLevel(logging.INFO) timeHandler.setFormatter(formatter) log_obj.addHandler(timeHandler) return log_obj class Scheduler(LoggerUtils): def __init__(self): # 执行器设置 executors = { 'default': ThreadPoolExecutor(10), # 设置一个名为“default”的ThreadPoolExecutor,其worker值为10 'processpool': ProcessPoolExecutor(5) # 设置一个名为“processpool”的ProcessPoolExecutor,其worker值为5 } self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors) # 存储器设置 # 这里使用sqlalchemy存储器,将任务存储在mysql sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8' self.scheduler.add_jobstore('sqlalchemy',url=sql_url) def log_listen(event): if event.exception: # 日志记录 self.scheduler._logger.error(event.traceback) # 配置任务执行完成及错误时的监听 self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) # 配置日志监听 self.scheduler._logger = self.init_logger('sche_test') def add_job(self, *args, **kwargs): """添加任务""" self.scheduler.add_job(*args, **kwargs) def start(self): """开启任务""" self.scheduler.start() # 测试任务 def sch_test(job_type): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('时间:{}, {}测试apscheduler'.format(now, job_type)) print(1/0) # 添加任务,开启任务 sched = Scheduler() # 添加任务 sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5') # 开启任务 sched.start()
【相关推荐:Python3视频教程 】
위 내용은 Python의 예약된 작업 apscheduler 구현에 대한 자세한 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!