Maison > développement back-end > Tutoriel Python > Tellement pratique ! Module de planification, l'artefact de tâche périodique de Python !

Tellement pratique ! Module de planification, l'artefact de tâche périodique de Python !

王林
Libérer: 2023-04-12 14:01:08
avant
2093 Les gens l'ont consulté

Tellement pratique ! Module de planification, l'artefact de tâche périodique de Python !

Si vous souhaitez exécuter périodiquement un script Python sur un serveur Linux, le choix le plus connu devrait être le script Crontab, mais Crontab présente les défauts suivants :

<code style="font-family: monospace; font-size: inherit; background-color: rgba(0, 0, 0, 0.06); padding: 0px 2px; border-radius: 6px; line-height: inherit; overflow-wrap: break-word; text-indent: 0px;"><strong>​1.不方便执行<strong>秒级的任务</strong>。​</strong> 

<strong>​2.当需要执行的定时任务有上百个的时候,Crontab的<strong>管理就会特别不方便</strong>。​</strong> 

另外一个选择是 Celery,但是 Celery 的配置比较麻烦,如果你只是需要一个轻量级的调度工具,Celery 不会是一个好选择。

在你想要使用一个轻量级的任务调度工具,而且希望它尽量简单、容易使用、不需要外部依赖,最好能够容纳 Crontab 的所有基本功能,那么 Schedule 模块是你的不二之选。

使用它来调度任务可能只需要几行代码,感受一下:

# Python 实用宝典
import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)
Copier après la connexion

上面的代码表示每10分钟执行一次 job 函数,非常简单方便。你只需要引入 schedule 模块,通过调用 <strong>​scedule.every(时间数).时间类型.do(job)​</strong> 发布周期任务。

发布后的周期任务需要用 <strong>​run_pending​</strong> 函数来检测是否执行,因此需要一个 <strong>​While​</strong>​1. Il n'est pas pratique d'effectuer des tâches de deuxième niveau. ​

<strong>​2. Lorsqu'il y a des centaines de tâches planifiées qui doivent être exécutées, la gestion </strong> de Crontab sera particulièrement gênant</em>. ​</span>

Celery est une autre option, mais la configuration de Celery est plus compliquée. Si vous avez juste besoin d'un outil de planification léger, Celery ne sera pas un bon choix.

Si vous souhaitez utiliser un outil de planification de tâches léger et espérez qu'il soit aussi simple et facile à utiliser que possible sans dépendances externes, et qu'il puisse de préférence prendre en charge toutes les fonctions de base de Crontab, alors le module Planification est votre meilleur choix.

L'utiliser pour planifier des tâches peut ne nécessiter que quelques lignes de code, ressentez-le :

pip install schedule
Copier après la connexion
Le code ci-dessus signifie que la fonction de travail est exécutée toutes les 10 minutes, ce qui est très simple et pratique. Il vous suffit d'introduire le module de planification en appelant

<span style="color: #666666;">​scedule.every(time number).time type.do(job)​</span></ code ></p><p ><span style="font-size: 18px;"> Publier des tâches périodiques. <em><strong>Les tâches périodiques après la sortie doivent utiliser </strong></em> </span>​<code style="font-family: monospace; font-size: Heherit; background-color: rgba(0, 0, 0, 0.06); padding: 0px 2px; border-radius : 6px ; line-height : hériter ; overflow-wrap : break-word ; text-indent : 0px;"></p>​run_ending​<p >

fonction pour détecter s'il faut exécuter, donc a

<strong>​While​</strong>

loop interroge continuellement cette fonction.

Ce qui suit parlera de l'installation et de l'utilisation de base et avancée du module Schedule.

1. Préparation

Veuillez choisir l'une des méthodes suivantes pour saisir la commande pour installer les dépendances :

1. Environnement Windows Ouvrez Cmd (Démarrer-Exécuter-CMD).

2. Environnement MacOS Ouvrez le Terminal (commande + espace pour entrer dans le Terminal).

3. Si vous utilisez l'éditeur VSCode ou Pycharm, vous pouvez utiliser directement le terminal en bas de l'interface.

# Python 实用宝典
import schedule
import time

def job():
    print("I'm working...")

# 每十分钟执行任务
schedule.every(10).minutes.do(job)
# 每个小时执行任务
schedule.every().hour.do(job)
# 每天的10:30执行任务
schedule.every().day.at("10:30").do(job)
# 每个月执行任务
schedule.every().monday.do(job)
# 每个星期三的13:15分执行任务
schedule.every().wednesday.at("13:15").do(job)
# 每分钟的第17秒执行任务
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)
Copier après la connexion

🎜2. mentionné au début de l'article, j'y suis allé, et voici d'autres exemples de tâches de planification : 🎜
# Python 实用宝典
import schedule
import time

def job_that_executes_once():
    # 此处编写的任务只会执行一次...
    return schedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

while True:
    schedule.run_pending()
    time.sleep(1)
Copier après la connexion
🎜 Vous pouvez voir que la configuration des mois aux secondes est couverte par les exemples ci-dessus. Mais🎜si vous souhaitez exécuter la tâche une seule fois🎜, vous pouvez la configurer comme ceci :🎜
# Python 实用宝典
import schedule

def greet(name):
    print('Hello', name)

# do() 将额外的参数传递给job函数
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
Copier après la connexion
🎜🎜Passer des paramètres🎜🎜🎜Si vous avez des paramètres qui doivent être transmis au travail pour l'exécution, il vous suffit de le faire :🎜
# Python 实用宝典
import schedule

def hello():
    print('Hello world')

schedule.every().second.do(hello)

all_jobs = schedule.get_jobs()
Copier après la connexion
🎜🎜Obtenir tous les paramètres actuels Jobs 🎜🎜🎜 Si vous souhaitez obtenir tous les jobs en cours : 🎜
# Python 实用宝典
import schedule

def greet(name):
    print('Hello {}'.format(name))

schedule.every().second.do(greet)

schedule.clear()
Copier après la connexion
🎜🎜 Annuler tous les jobs 🎜🎜🎜 Si un mécanisme est déclenché, vous devez effacer tous les jobs du programme en cours immédiatement : 🎜
# Python 实用宝典
import schedule

def greet(name):
    print('Hello {}'.format(name))

# .tag 打标签
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

# get_jobs(标签):可以获取所有该标签的任务
friends = schedule.get_jobs('friend')

# 取消所有 daily-tasks 标签的任务
schedule.clear('daily-tasks')
Copier après la connexion
🎜🎜 Fonction Tag 🎜🎜🎜 in Lors de la création d'un travail, afin de faciliter la gestion du travail ultérieurement, vous pouvez taguer le travail, afin de pouvoir filtrer le travail pour obtenir le travail ou annuler le travail . 🎜
# Python 实用宝典
import schedule
from datetime import datetime, timedelta, time

def job():
    print('Boo')

# 每个小时运行作业,18:30后停止
schedule.every(1).hours.until("18:30").do(job)

# 每个小时运行作业,2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)

# 每个小时运行作业,8个小时后停止
schedule.every(1).hours.until(timedelta(hours=8)).do(job)

# 每个小时运行作业,11:32:42后停止
schedule.every(1).hours.until(time(11, 33, 42)).do(job)

# 每个小时运行作业,2020-5-17 11:36:20后停止
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
Copier après la connexion
🎜🎜Définir la date limite du travail🎜🎜🎜Si vous avez besoin qu'un travail soit dû dans un certain délai, vous pouvez utiliser cette méthode : 🎜
# Python 实用宝典
import schedule

def job_1():
    print('Foo')

def job_2():
    print('Bar')

schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all()

# 立即运行所有作业,每次作业间隔10秒
schedule.run_all(delay_seconds=10)
Copier après la connexion
Copier après la connexion
🎜Après la date limite, le travail ne pourra pas être exécuté. 🎜🎜🎜Exécutez toutes les tâches immédiatement, quel que soit leur emploi du temps🎜🎜

如果某个机制触发了,你需要立即运行所有作业,可以调用 <strong>​schedule.run_all()​</strong> :

# Python 实用宝典
import schedule

def job_1():
    print('Foo')

def job_2():
    print('Bar')

schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all()

# 立即运行所有作业,每次作业间隔10秒
schedule.run_all(delay_seconds=10)
Copier après la connexion
Copier après la connexion

3.高级使用

装饰器安排作业

如果你觉得设定作业这种形式太啰嗦了,也可以使用装饰器模式:

# Python 实用宝典
from schedule import every, repeat, run_pending
import time

# 此装饰器效果等同于 schedule.every(10).minutes.do(job)
@repeat(every(10).minutes)
def job():
    print("I am a scheduled job")

while True:
    run_pending()
    time.sleep(1)
Copier après la connexion

并行执行

默认情况下,Schedule 按顺序执行所有作业。其背后的原因是,很难找到让每个人都高兴的并行执行模型。

不过你可以通过多线程的形式来运行每个作业以解决此限制:

# Python 实用宝典
import threading
import time
import schedule

def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)

while True:
    schedule.run_pending()
    time.sleep(1)
Copier après la connexion

日志记录

Schedule 模块同时也支持 logging 日志记录,这么使用:

# Python 实用宝典
import schedule
import logging

logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
# 日志级别为DEBUG
schedule_logger.setLevel(level=logging.DEBUG)

def job():
    print("Hello, Logs")

schedule.every().second.do(job)

schedule.run_all()

schedule.clear()
Copier après la connexion

效果如下:

DEBUG:schedule:Running *all* 1 jobs with 0s delay in between
DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={})
Hello, Logs
DEBUG:schedule:Deleting *all* jobs
Copier après la connexion

异常处理

Schedule 不会自动捕捉异常,它遇到异常会直接抛出,这会导致一个严重的问题:后续所有的作业都会被中断执行,因此我们需要捕捉到这些异常。

你可以手动捕捉,但是某些你预料不到的情况需要程序进行自动捕获,加一个装饰器就能做到了:

# Python 实用宝典
import functools

def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator

@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0

schedule.every(5).minutes.do(bad_task)
Copier après la connexion

这样,<strong>​bad_task​</strong> 在执行时遇到的任何错误,都会被 <strong>​catch_exceptions ​</strong> 捕获,这点在保证调度任务正常运转的时候非常关键。

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:51cto.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal