Home > Backend Development > Python Tutorial > Detailed analysis of Python's implementation of scheduled tasks apscheduler

Detailed analysis of Python's implementation of scheduled tasks apscheduler

WBOY
Release: 2022-10-10 16:29:33
forward
2484 people have browsed it

This article brings you relevant knowledge about Python, which mainly introduces related issues about implementing scheduled tasks. You can use third-party packages to manage scheduled tasks. Relatively speaking, apscheduler uses It's simpler to use. Let's take a look at the method of use. I hope it will be helpful to everyone.

Detailed analysis of Python's implementation of scheduled tasks apscheduler

[Related recommendations: Python3 video tutorial]

First introduction to apscheduler

Let’s take a simple example See how apscheduler is used.

#encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def sch_test():
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, 测试apscheduler'.format(now))
task = BlockingScheduler()
task.add_job(func=sch_test, trigger='cron', second='*/10')
task.start()
Copy after login

The above example is very simple. We first need to define an apscheduler object, then add_job to add the task, and finally start the task.

The example is to run the sch_test task every 10 seconds. The running results are as follows:

时间:2022-10-08 15:16:30, 测试apscheduler
时间:2022-10-08 15:16:40, 测试apscheduler
时间:2022-10-08 15:16:50, 测试apscheduler
时间:2022-10-08 15:17:00, 测试apscheduler
Copy after login

If we want to carry parameters when executing the task function, just add args to the add_job function, such as task .add_job(func=sch_test, args=('a'), trigger='cron', second='*/10').

What modules does apscheduler have?

In the above example, we have a preliminary understanding of how to use apschedulerl. Next, we need to know the design framework of apscheduler. apscheduler has four main modules: triggers, job_stores, executors, and schedulers.

1. Triggers:

Trigger refers to the triggering method specified by the task. In the example we use "cron" Way. We can choose one of cron, date, and interval.

Cron represents a scheduled task, similar to linux crontab, which is triggered at a specified time.

The available parameters are as follows:

Detailed analysis of Pythons implementation of scheduled tasks apscheduler

In addition, we can also use expression types to set cron. For example, commonly used ones are:

Detailed analysis of Pythons implementation of scheduled tasks apscheduler

Usage example, executed once every day at 7:20:

task.add_job(func=sch_test, args=( 'Timed task',), trigger='cron',

hour='7', minute='20')

date represents a one-time task specific to a certain time;

Usage example:

# 使用run_date指定运行时间
task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30))
# 或者用next_run_time
task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
Copy after login

interval represents a cyclic task, specifies an interval, and executes it every time the interval passes.

interval can set the following parameters:

Detailed analysis of Pythons implementation of scheduled tasks apscheduler

# Usage example, execute the sch_test task every 3 seconds:

task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)。
Copy after login

Come on The example uses all three triggers:

# encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
task = BlockingScheduler()
task.add_job(func=sch_test, args=('一次性任务',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
task.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)
task.start()
Copy after login

Print part of the results:

时间:2022-10-08 15:45:49, 一次性任务测试apscheduler
时间:2022-10-08 15:45:49, 循环任务测试apscheduler
时间:2022-10-08 15:45:50, 定时任务测试apscheduler
时间:2022-10-08 15:45:52, 循环任务测试apscheduler
时间:2022-10-08 15:45:55, 定时任务测试apscheduler
时间:2022-10-08 15:45:55, 循环任务测试apscheduler
时间:2022-10-08 15:45:58, 循环任务测试apscheduler
Copy after login

Through code examples and result display, we can clearly know the difference in the use of different triggers.

2. Task storage job_stores

As the name suggests, task storage is where tasks are stored, and they are stored in memory by default. We can also customize the storage method, such as saving tasks in mysql. There are several options here:

Detailed analysis of Pythons implementation of scheduled tasks apscheduler

Usually the default is to store it in the memory, but if the program fails and restarts, the task will be pulled and run again. If the execution requirements are high, you can choose other memories.

Example of using SQLAlchemyJobStore storage:

from apscheduler.schedulers.blocking import BlockingScheduler
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
sched = BlockingScheduler()
# 使用mysql存储任务
sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
sched.start()
Copy after login

3. Executor executors

The function of the executor is to put the task into Run in thread pool or process pool. There are several options:

Detailed analysis of Pythons implementation of scheduled tasks apscheduler

#The default is ThreadPoolExecutor, and the commonly used ones are thread and process pool executors. If the application is a CPU-intensive operation, ProcessPoolExecutor can be used to execute it.

4. Schedulers schedulers

The scheduler belongs to the core of apscheduler. It plays the role of coordinating the entire apscheduler system, including memory and executor. , the trigger runs normally under its schedule. There are several schedulers:

Detailed analysis of Pythons implementation of scheduled tasks apscheduler

#Not in specific scenarios, the most commonly used scheduler is BlockingScheduler.

Exception monitoring

When the scheduled task is running, if an error occurs, a monitoring mechanism needs to be set up. We usually use the logging module to record error information.

Usage example:

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
# logging日志配置打印格式及保存位置
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='sche.log',
                    filemode='a')
def log_listen(event):
if event.exception :
print ( '任务出错,报错信息:{}'.format(event.exception))
else:
print ( '任务正常运行...' )
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
    print(1/0)
sched = BlockingScheduler()
# 使用mysql存储任务
sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
# 配置任务执行完成及错误时的监听
sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 配置日志监听
sched._logger = logging
sched.start()
Copy after login

apscheduler encapsulation usage

上面介绍了apscheduler框架的主要模块,我们基本能掌握怎样使用apscheduler了。下面就来封装一下apscheduler吧,以后要用直接在这份代码上修改就行了。

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
import logging.handlers
import os
import datetime
class LoggerUtils():
    def init_logger(self, logger_name):
        # 日志格式
        formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
        log_obj = logging.getLogger(logger_name)
        log_obj.setLevel(logging.INFO)
        # 设置log存储位置
        path = '/data/logs/'
        filename = '{}{}.log'.format(path, logger_name)
        if not os.path.exists(path):
            os.makedirs(path)
        # 设置日志按照时间分割
        timeHandler = logging.handlers.TimedRotatingFileHandler(
           filename,
           when='D',  # 按照什么维度切割, S:秒,M:分,H:小时,D:天,W:周
           interval=1, # 多少天切割一次
           backupCount=10  # 保留几天
        )
        timeHandler.setLevel(logging.INFO)
        timeHandler.setFormatter(formatter)
        log_obj.addHandler(timeHandler)
        return log_obj
class Scheduler(LoggerUtils):
    def __init__(self):
        # 执行器设置
        executors = {
            'default': ThreadPoolExecutor(10),  # 设置一个名为“default”的ThreadPoolExecutor,其worker值为10
            'processpool': ProcessPoolExecutor(5)  # 设置一个名为“processpool”的ProcessPoolExecutor,其worker值为5
        }
        self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors)
        # 存储器设置
        # 这里使用sqlalchemy存储器,将任务存储在mysql
        sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
        self.scheduler.add_jobstore('sqlalchemy',url=sql_url)
        def log_listen(event):
            if event.exception:
                # 日志记录
                self.scheduler._logger.error(event.traceback)
    
        # 配置任务执行完成及错误时的监听
        self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
        # 配置日志监听
        self.scheduler._logger = self.init_logger('sche_test')
    def add_job(self, *args, **kwargs):
        """添加任务"""
        self.scheduler.add_job(*args, **kwargs)
    def start(self):
        """开启任务"""
        self.scheduler.start()
# 测试任务
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('时间:{}, {}测试apscheduler'.format(now, job_type))
    print(1/0)
# 添加任务,开启任务
sched = Scheduler()
# 添加任务
sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')
# 开启任务
sched.start()
Copy after login

【相关推荐:Python3视频教程

The above is the detailed content of Detailed analysis of Python's implementation of scheduled tasks apscheduler. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:juejin.im
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template