Home > Backend Development > Python Tutorial > How to install and use the third-party module apscheduler in Python?

How to install and use the third-party module apscheduler in Python?

王林
Release: 2023-05-08 10:04:17
forward
1679 people have browsed it

    apscheduler module

    Install apscheduler module

    pip install apscheduler
    Copy after login

    apscheduler module introduction

    APScheduler( Advanced Python Scheduler) is a lightweight Python scheduled task scheduling framework (Python library).

    APScheduler has three built-in scheduling systems, including:

    • cron-style scheduling (optional start/end time)

    • Interval-based execution (run the job at even intervals, you can also choose the start/end time)

    • One-time delayed execution of the task (run the job once at the specified date/time)

    Supported backend storage jobs

    APScheduler can arbitrarily mix and match the backend of the scheduling system and job storage. Supported backend storage jobs include:

    • Memory

    • ##SQLAlchemy

    • MongoDB

    • Redis

    • RethinkDB

    • ZooKeeper

    APScheduler has four components

    • Triggers (triggers) contain scheduling logic, and each job has its own trigger to determine the next running time. Apart from their own initial configuration, triggers are completely stateless.

    • job stores (job storage) store scheduled jobs. The default job store simply saves the job in memory, and other job stores save the job in the database. . When a job is saved to a persistent job store, the job's data is serialized and deserialized on load. Job storage cannot share the scheduler.

    • Executors handle the running of jobs. They usually do this by submitting specified callable objects to a thread or process pool in the job. When the job is completed, the executor will notify the scheduler.

    • schedulers (Scheduler) Configuring job storage and executors can be done in the scheduler, such as adding, modifying and removing jobs. Different schedulers can be selected according to different application scenarios. There are 7 options available: BlockingScheduler, BackgroundScheduler, AsyncIOScheduler, GeventScheduler, TornadoScheduler, TwistedScheduler, and QtScheduler.

    Introduction to each component

    Trigger

    When you schedule a job, you need to select a trigger for this job to describe this When the job is triggered, APScheduler has three built-in trigger types:

    • date: a one-time specified date;

    • interval: at a certain time How often to execute within a time range;

    • cron: Linux crontab format is compatible and the most powerful.

    date The most basic kind of scheduling, the job will only be executed once. Its parameters are as follows:

    1.run_date

    (datetime|str) – The running date or time of the job

    2.timezone

    (datetime.tzinfo|str) &ndash ; Specify time zone

    Job Storage

    If your application will re-create the job every time it is started, then use the default job store (MemoryJobStore), But if you need to retain jobs despite scheduler restarts or application crashes, you should choose a specific job store based on your application environment. For example: use Mongo or SQLAlchemy JobStore (used to support most RDBMS)

    executor

    The choice of executor depends on which of the above frameworks you use, most In this case, using the default ThreadPoolExecutor can already meet the needs. If your application involves

    CPU-intensive operations, you may consider using ProcessPoolExecutor to use more CPU cores. You can also use both at the same time, using ProcessPoolExecutor as the second executor.

    Choose the right scheduler

    • BlockingScheduler: When the scheduler is the only thing running in your application

    • BackgroundScheduler : Use when you are not running any other framework and want the scheduler to execute in the background of your app.

    • AsyncIOScheduler: Used when your program uses asyncio (an asynchronous framework).

    • GeventScheduler: Used when your program uses gevent (a high-performance Python concurrency framework).

    • TornadoScheduler: Used when your program is based on Tornado (a web framework).

    • TwistedScheduler: Use when your program uses Twisted (an asynchronous framework)

    • QtScheduler: If your application is a Qt Can be used when applying.

    apscheduler module uses

    Add job

    There are two ways to add a new job:

    add_job to add a job ;

    # -*- coding:utf-8 -*-
    from apscheduler.schedulers.blocking import BlockingScheduler
    import datetime
    
    def my_job1():
        print('my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    def my_job2():
        print('my_job2 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    sched = BlockingScheduler()
    # 每隔5秒运行一次my_job1
    sched.add_job(my_job1, 'interval', seconds=5, id='my_job1')
    # 每隔5秒运行一次my_job2
    sched.add_job(my_job2, 'cron', second='*/5', id='my_job2')
    sched.start()
    Copy after login

    Decorator mode adds jobs.

    # -*- coding:utf-8 -*-
    from apscheduler.schedulers.blocking import BlockingScheduler
    import datetime
    
    sched = BlockingScheduler()
    
    # 每隔5秒运行一次my_job1
    @sched.scheduled_job('interval', seconds=5, id='my_job1')
    def my_job1():
        print('my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    # 每隔5秒运行一次my_job2
    @sched.scheduled_job('cron', second='*/5', id='my_job2')
    def my_job2():
        print('my_job2 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    sched.start()
    Copy after login

    Remove job

    No removal job

    # -*- coding:utf-8 -*-
    from apscheduler.schedulers.blocking import BlockingScheduler
    import datetime
    
    def my_job(text=""):
        print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    sched = BlockingScheduler()
    job = sched.add_job(my_job, 'interval', seconds=2, args=['第一个作业'])
    # #如果有多个任务序列的话可以给每个任务设置ID号,可以根据ID号选择清除对象,且remove放到start前才有效
    sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二个作业'])
    sched.start()
    Copy after login

    Code execution result:

    How to install and use the third-party module apscheduler in Python?

    使用remove() 移除作业

    # -*- coding:utf-8 -*-
    from apscheduler.schedulers.blocking import BlockingScheduler
    import datetime
    
    def my_job(text=""):
        print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    sched = BlockingScheduler()
    job = sched.add_job(my_job, 'interval', seconds=2, args=['第一个作业'])
    job.remove()
    # #如果有多个任务序列的话可以给每个任务设置ID号,可以根据ID号选择清除对象,且remove放到start前才有效
    sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二个作业'])
    sched.start()
    Copy after login

    代码执行结果:

    How to install and use the third-party module apscheduler in Python?

    使用remove_job()移除作业

    # -*- coding:utf-8 -*-
    from apscheduler.schedulers.blocking import BlockingScheduler
    import datetime
    
    def my_job(text=""):
        print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    sched = BlockingScheduler()
    job = sched.add_job(my_job, 'interval', seconds=2, args=['第一个作业'])
    # #如果有多个任务序列的话可以给每个任务设置ID号,可以根据ID号选择清除对象,且remove放到start前才有效
    sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二个作业'])
    sched.remove_job('my_job_id')
    sched.start()
    Copy after login

    代码执行结果:

    How to install and use the third-party module apscheduler in Python?

    触发器类型

    APScheduler有3中内置的触发器类型:

    • 新建一个调度器(scheduler);

    • 添加一个调度任务(job store);

    • 运行调度任务。

    代码实现

    # -*- coding:utf-8 -*-
    import time
    import datetime
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    def my_job(text="默认值"):
        print(text, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    
    sched = BlockingScheduler()
    sched.add_job(my_job, 'interval', seconds=3, args=['3秒定时'])
    # 2018-3-17 00:00:00 执行一次,args传递一个text参数
    sched.add_job(my_job, 'date', run_date=datetime.date(2019, 10, 17), args=['根据年月日定时执行'])
    # 2018-3-17 13:46:00 执行一次,args传递一个text参数
    sched.add_job(my_job, 'date', run_date=datetime.datetime(2019, 10, 17, 14, 10, 0), args=['根据年月日时分秒定时执行'])
    # sched.start()
    """
    interval 间隔调度,参数如下:
        weeks (int) – 间隔几周
        days (int) – 间隔几天
        hours (int) – 间隔几小时
        minutes (int) – 间隔几分钟
        seconds (int) – 间隔多少秒
        start_date (datetime|str) – 开始日期
        end_date (datetime|str) – 结束日期
        timezone (datetime.tzinfo|str) – 时区
    """
    """
    cron参数如下:
        year (int|str) – 年,4位数字
        month (int|str) – 月 (范围1-12)
        day (int|str) – 日 (范围1-31)
        week (int|str) – 周 (范围1-53)
        day_of_week (int|str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
        hour (int|str) – 时 (范围0-23)
        minute (int|str) – 分 (范围0-59)
        second (int|str) – 秒 (范围0-59)
        start_date (datetime|str) – 最早开始日期(包含)
        end_date (datetime|str) – 最晚结束时间(包含)
        timezone (datetime.tzinfo|str) – 指定时区
    """
    # my_job将会在6,7,8,11,12月的第3个周五的1,2,3点运行
    sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
    # 截止到2018-12-30 00:00:00,每周一到周五早上五点半运行job_function
    sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2018-12-31')
    
    # 表示2017年3月22日17时19分07秒执行该程序
    sched.add_job(my_job, 'cron', year=2017, month=3, day=22, hour=17, minute=19, second=7)
    
    # 表示任务在6,7,8,11,12月份的第三个星期五的00:00,01:00,02:00,03:00 执行该程序
    sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
    
    # 表示从星期一到星期五5:30(AM)直到2014-05-30 00:00:00
    sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
    
    # 表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5
    sched.add_job(my_job, 'cron', second='*/5', args=['5秒定时'])
    sched.start()
    Copy after login
    cron表达式参数描述
    *anyFire on every value
    */aanyFire every a values, starting from the minimum
    a-banyFire on any value within the a-b range (a must be smaller than b)
    a-b/canyFire every c values within the a-b range
    xth ydayFire on the x -th occurrence of weekday y within the month
    last xdayFire on the last occurrence of weekday x within the month
    lastdayFire on the last day within the month
    x,y,zanyFire on any matching expression; can combine any number of any of the above expressions

    使用SQLAlchemy作业存储器存放作业

    # -*- coding:utf-8 -*-
    from apscheduler.schedulers.blocking import BlockingScheduler
    from datetime import datetime
    import logging
    sched = BlockingScheduler()
    
    def my_job():
        print('my_job is running, Now is %s' % datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    # 使用sqlalchemy作业存储器
    # 根据自己电脑安装的库选择用什么连接 ,如pymysql   其中:scrapy表示数据库的名称,操作数据库之前应创建对应的数据库
    url = 'mysql+pymysql://root:123456@localhost:3306/scrapy?charset=utf8'
    sched.add_jobstore('sqlalchemy', url=url)
    # 添加作业
    sched.add_job(my_job, 'interval', id='myjob', seconds=5)
    
    log = logging.getLogger('apscheduler.executors.default')
    log.setLevel(logging.INFO)  # DEBUG
    # 设定日志格式
    fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s')
    h = logging.StreamHandler()
    h.setFormatter(fmt)
    log.addHandler(h)
    
    sched.start()
    Copy after login

    暂停和恢复作业

    # 暂停作业:
    apsched.job.Job.pause()
    apsched.schedulers.base.BaseScheduler.pause_job()
    # 恢复作业:
    apsched.job.Job.resume()
    apsched.schedulers.base.BaseScheduler.resume_job()
    Copy after login

    获得job列表

    • get_jobs(),它会返回所有的job实例;

    • 使用print_jobs()来输出所有格式化的作业列表;

    • get_job(job_id=“任务ID”)获取指定任务的作业列表。

    代码实现:

    # -*- coding:utf-8 -*-
    from apscheduler.schedulers.blocking import BlockingScheduler
    import datetime
    
    def my_job(text=""):
        print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    sched = BlockingScheduler()
    
    job = sched.add_job(my_job, 'interval', seconds=2, args=['第一个作业'])
    sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二个作业'])
    
    print(sched.get_jobs())
    
    print(sched.get_job(job_id="my_job_id"))
    
    sched.print_jobs()
    sched.start()
    Copy after login

    关闭调度器

    默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将wait选项设置为False。

    sched.shutdown()
    sched.shutdown(wait=False)
    Copy after login

    The above is the detailed content of How to install and use the third-party module apscheduler in Python?. For more information, please follow other related articles on the PHP Chinese website!

    Related labels:
    source:yisu.com
    Statement of this Website
    The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
    Popular Tutorials
    More>
    Latest Downloads
    More>
    Web Effects
    Website Source Code
    Website Materials
    Front End Template