Linux 서버에서 정기적으로 Python 스크립트를 실행하려는 경우 가장 유명한 선택은 Crontab 스크립트여야 하지만 Crontab에는 다음과 같은 단점이 있습니다.
<code style="font-family: monospace; font-size: inherit; background-color: rgba(0, 0, 0, 0.06); padding: 0px 2px; border-radius: 6px; line-height: inherit; overflow-wrap: break-word; text-indent: 0px;"><strong>1.不方便执行<strong>秒级的任务</strong>。</strong>
<strong>2.当需要执行的定时任务有上百个的时候,Crontab的<strong>管理就会特别不方便</strong>。</strong>
另外一个选择是 Celery,但是 Celery 的配置比较麻烦,如果你只是需要一个轻量级的调度工具,Celery 不会是一个好选择。
在你想要使用一个轻量级的任务调度工具,而且希望它尽量简单、容易使用、不需要外部依赖,最好能够容纳 Crontab 的所有基本功能,那么 Schedule 模块是你的不二之选。
使用它来调度任务可能只需要几行代码,感受一下:
# Python 实用宝典 import schedule import time def job(): print("I'm working...") schedule.every(10).minutes.do(job) while True: schedule.run_pending() time.sleep(1)
上面的代码表示每10分钟执行一次 job 函数,非常简单方便。你只需要引入 schedule 模块,通过调用 <strong>scedule.every(时间数).时间类型.do(job)</strong>
发布周期任务。
发布后的周期任务需要用 <strong>run_pending</strong>
函数来检测是否执行,因此需要一个 <strong>While</strong>
1. 2단계 작업을 수행하는 것이 불편합니다.
<strong>2. 실행해야 할 예약된 작업이 수백 개 있는 경우 Crontab의 </strong> 관리 특히 불편할 거예요</em>. </span>
또 다른 옵션은 Celery이지만 Celery의 구성이 더 번거롭기 때문에 가벼운 일정 관리 도구만 필요한 경우 Celery는 좋은 선택이 아닙니다.
가벼운 작업 일정 도구를 사용하고 외부 종속성 없이 최대한 간단하고 사용하기 쉽고 Crontab의 모든 기본 기능을 수용할 수 있기를 원한다면 일정 모듈이 최선의 선택입니다.
작업을 예약하는 데 사용하는 데는 몇 줄의 코드만 필요할 수 있습니다. 느껴보세요:pip install schedule
<span style="color: #666666;">scedule.every(시간 번호).time type.do(작업)</span></ code ></p><p ><span style="font-size: 18px;"> 정기적인 작업을 게시하세요. <em><strong>출시 후 정기적인 작업에서는 </strong></em> </span><code style="font-family: monospace; 글꼴 크기: 상속; 배경색: rgba(0, 0, 0, 0.06); 패딩: 0px 2px; border-radius: 6px; line-height: receive; Overflow-wrap: text-indent: 0px;"></p>run_pending<p >
함수를 사용하여 실행 여부를 감지합니다. a
<strong>While</strong>
다음에서는 Schedule 모듈의 설치와 기본 및 고급 사용법을 자세히 설명합니다.
1. 준비
종속성 설치 명령을 입력하려면 다음 방법 중 하나를 선택하세요.
1 Windows 환경 Cmd(시작-실행-CMD)를 엽니다.2. MacOS 환경 터미널을 엽니다(터미널에 들어가려면 Command+Space).
3. VSCode 편집기나 Pycharm을 사용하는 경우 인터페이스 하단의 터미널을 직접 사용할 수 있습니다.# Python 实用宝典 import schedule import time def job(): print("I'm working...") # 每十分钟执行任务 schedule.every(10).minutes.do(job) # 每个小时执行任务 schedule.every().hour.do(job) # 每天的10:30执行任务 schedule.every().day.at("10:30").do(job) # 每个月执行任务 schedule.every().monday.do(job) # 每个星期三的13:15分执行任务 schedule.every().wednesday.at("13:15").do(job) # 每分钟的第17秒执行任务 schedule.every().minute.at(":17").do(job) while True: schedule.run_pending() time.sleep(1)
2. 기본 사용법은🎜🎜🎜🎜🎜 제가 거기에 있었던 기사의 시작 부분에 언급되어 있으며 다음은 작업 예약에 대한 추가 예입니다. 🎜
# Python 实用宝典 import schedule import time def job_that_executes_once(): # 此处编写的任务只会执行一次... return schedule.CancelJob schedule.every().day.at('22:30').do(job_that_executes_once) while True: schedule.run_pending() time.sleep(1)
# Python 实用宝典 import schedule def greet(name): print('Hello', name) # do() 将额外的参数传递给job函数 schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob')
# Python 实用宝典 import schedule def hello(): print('Hello world') schedule.every().second.do(hello) all_jobs = schedule.get_jobs()
# Python 实用宝典 import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().second.do(greet) schedule.clear()
# Python 实用宝典 import schedule def greet(name): print('Hello {}'.format(name)) # .tag 打标签 schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') # get_jobs(标签):可以获取所有该标签的任务 friends = schedule.get_jobs('friend') # 取消所有 daily-tasks 标签的任务 schedule.clear('daily-tasks')
# Python 实用宝典 import schedule from datetime import datetime, timedelta, time def job(): print('Boo') # 每个小时运行作业,18:30后停止 schedule.every(1).hours.until("18:30").do(job) # 每个小时运行作业,2030-01-01 18:33 today schedule.every(1).hours.until("2030-01-01 18:33").do(job) # 每个小时运行作业,8个小时后停止 schedule.every(1).hours.until(timedelta(hours=8)).do(job) # 每个小时运行作业,11:32:42后停止 schedule.every(1).hours.until(time(11, 33, 42)).do(job) # 每个小时运行作业,2020-5-17 11:36:20后停止 schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
# Python 实用宝典 import schedule def job_1(): print('Foo') def job_2(): print('Bar') schedule.every().monday.at("12:40").do(job_1) schedule.every().tuesday.at("16:40").do(job_2) schedule.run_all() # 立即运行所有作业,每次作业间隔10秒 schedule.run_all(delay_seconds=10)
如果某个机制触发了,你需要立即运行所有作业,可以调用 <strong>schedule.run_all()</strong>
:
# Python 实用宝典 import schedule def job_1(): print('Foo') def job_2(): print('Bar') schedule.every().monday.at("12:40").do(job_1) schedule.every().tuesday.at("16:40").do(job_2) schedule.run_all() # 立即运行所有作业,每次作业间隔10秒 schedule.run_all(delay_seconds=10)
3.高级使用
装饰器安排作业
如果你觉得设定作业这种形式太啰嗦了,也可以使用装饰器模式:
# Python 实用宝典 from schedule import every, repeat, run_pending import time # 此装饰器效果等同于 schedule.every(10).minutes.do(job) @repeat(every(10).minutes) def job(): print("I am a scheduled job") while True: run_pending() time.sleep(1)
并行执行
默认情况下,Schedule 按顺序执行所有作业。其背后的原因是,很难找到让每个人都高兴的并行执行模型。
不过你可以通过多线程的形式来运行每个作业以解决此限制:
# Python 实用宝典 import threading import time import schedule def job1(): print("I'm running on thread %s" % threading.current_thread()) def job2(): print("I'm running on thread %s" % threading.current_thread()) def job3(): print("I'm running on thread %s" % threading.current_thread()) def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10).seconds.do(run_threaded, job1) schedule.every(10).seconds.do(run_threaded, job2) schedule.every(10).seconds.do(run_threaded, job3) while True: schedule.run_pending() time.sleep(1)
日志记录
Schedule 模块同时也支持 logging 日志记录,这么使用:
# Python 实用宝典 import schedule import logging logging.basicConfig() schedule_logger = logging.getLogger('schedule') # 日志级别为DEBUG schedule_logger.setLevel(level=logging.DEBUG) def job(): print("Hello, Logs") schedule.every().second.do(job) schedule.run_all() schedule.clear()
效果如下:
DEBUG:schedule:Running *all* 1 jobs with 0s delay in between DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={}) Hello, Logs DEBUG:schedule:Deleting *all* jobs
异常处理
Schedule 不会自动捕捉异常,它遇到异常会直接抛出,这会导致一个严重的问题:后续所有的作业都会被中断执行,因此我们需要捕捉到这些异常。
你可以手动捕捉,但是某些你预料不到的情况需要程序进行自动捕获,加一个装饰器就能做到了:
# Python 实用宝典 import functools def catch_exceptions(cancel_on_failure=False): def catch_exceptions_decorator(job_func): @functools.wraps(job_func) def wrapper(*args, **kwargs): try: return job_func(*args, **kwargs) except: import traceback print(traceback.format_exc()) if cancel_on_failure: return schedule.CancelJob return wrapper return catch_exceptions_decorator @catch_exceptions(cancel_on_failure=True) def bad_task(): return 1 / 0 schedule.every(5).minutes.do(bad_task)
这样,<strong>bad_task</strong>
在执行时遇到的任何错误,都会被 <strong>catch_exceptions </strong>
捕获,这点在保证调度任务正常运转的时候非常关键。
위 내용은 매우 실용적입니다! Python의 주기적 작업 아티팩트인 Schedule 모듈!의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!