目錄
apscheduler 模組
apscheduler 模組介紹
支援的後端儲存作業
APScheduler有四個組成部分
各元件簡介
触发器类型
首頁 後端開發 Python教學 如何安裝並使用Python中的第三方模組apscheduler?

如何安裝並使用Python中的第三方模組apscheduler?

May 08, 2023 am 10:04 AM
python apscheduler

    apscheduler 模組

    安裝apscheduler 模組

    pip install apscheduler
    登入後複製

    apscheduler 模組介紹

    #APScheduler( Advanced Python Scheduler)是一個輕量級的Python定時任務排程框架(Python函式庫)。

    APScheduler有三個內建的調度系統,其中包括:

    • cron式調度(可選開始/結束時間)

    • 基於間隔的執行(以偶數間隔運行作業,也可以選擇開始/結束時間)

    • #一次延遲執行任務(在指定的日期/時間內執行一次作業)

    支援的後端儲存作業

    APScheduler可以任意混合和匹配調度系統和作業儲存的後端,其中支援後端儲存作業包括:

    • Memory

    • SQLAlchemy

    • MongoDB

    • Redis

    • RethinkDB

    • ZooKeeper

    APScheduler有四個組成部分

    • #triggers(觸發器)中包含調度邏輯,每個作業都由自己的觸發器來決定下次運行時間。除了他們自己初始配置意外,觸發器完全是無狀態的。

    • job stores(作業記憶體)儲存已排程的作業,預設的作業記憶體只是簡單地把作業保存在記憶體中,其他的作業記憶體則是將作業儲存在資料庫中。當作業被儲存到一個持久的作業記憶體中的時候,該作業的資料會被序列化,並在載入時被反序列化。作業記憶體不能共用調度器。

    • executors(執行器)處理作業的運行,他們通常透過在作業中提交指定的可調用物件到一個執行緒或進程池來進行。當作業完成時,執行器將會通知調度器。

    • schedulers(調度器)配置作業記憶體和執行器可以在調度器中完成,例如新增、修改和移除作業。根據不同的應用場景可以選用不同的調度器,可選的有BlockingScheduler,BackgroundScheduler,AsyncIOScheduler,GeventScheduler,TornadoScheduler,TwistedScheduler,QtScheduler 7種。

    各元件簡介

    觸發器

    當你排程作業的時候,你需要為這個作業選擇一個觸發器,用來描述這個作業何時被觸發,APScheduler有三種內建的觸發器類型:

    • date: 一次指定日期;

    • interval: 在某個時間範圍內間隔多久執行一次;

    • cron :Linux crontab格式相容,最強大。

    date 最基本的一種調度,作業只會執行一次。它的參數如下:

    1.run_date
    (datetime|str) – 作業的運作日期或時間

    2.timezone
    (datetime.tzinfo|str) &ndash ; 指定時區

    作業記憶體

    如果你的應用程式在每次啟動的時候都會重新建立作業,那麼使用預設的作業記憶體(MemoryJobStore)即可,但是如果你需要在調度器重新啟動或應用程式奔潰的情況下任然保留作業,你應該根據你的應用程式環境來選擇特定的作業記憶體。例如:使用Mongo或SQLAlchemy JobStore (用於支援大多數RDBMS)

    執行器

    對執行器的選擇取決於你使用上面哪些框架,大多數情況下,使用預設的ThreadPoolExecutor已經能夠滿足需求。如果你的應用程式涉及CPU密集型操作,你可以考慮使用ProcessPoolExecutor來使用更多的CPU核心。你也可以同時使用兩者,將ProcessPoolExecutor當作第二致動器。

    選擇適當的調度器

    • BlockingScheduler : 當調度器是你應用程式中唯一要執行的東西時

    • #BackgroundScheduler : 當你沒有執行任何其他框架並希望調度器在你應用的背景執行時使用。

    • AsyncIOScheduler : 當你的程式使用了asyncio(一個非同步框架)的時候使用。

    • GeventScheduler : 當你的程式使用了gevent(高效能的Python並發框架)的時候使用。

    • TornadoScheduler : 當你的程式是基於Tornado(一個網頁框架)的時候使用。

    • TwistedScheduler : 當你的程式使用了Twisted(一個非同步框架)的時候使用

    • ##QtScheduler : 如果你的應用程式是一個Qt應用的時候可以使用。

    apscheduler 模組使用

    新增作業

    #有兩種方式可以新增一個新的作業:

    add_job來新增作業;

    # -*- coding:utf-8 -*-
    from apscheduler.schedulers.blocking import BlockingScheduler
    import datetime
    
    def my_job1():
        print('my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    def my_job2():
        print('my_job2 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    sched = BlockingScheduler()
    # 每隔5秒运行一次my_job1
    sched.add_job(my_job1, 'interval', seconds=5, id='my_job1')
    # 每隔5秒运行一次my_job2
    sched.add_job(my_job2, 'cron', second='*/5', id='my_job2')
    sched.start()
    登入後複製

    裝飾器模式新增作業。

    # -*- coding:utf-8 -*-
    from apscheduler.schedulers.blocking import BlockingScheduler
    import datetime
    
    sched = BlockingScheduler()
    
    # 每隔5秒运行一次my_job1
    @sched.scheduled_job('interval', seconds=5, id='my_job1')
    def my_job1():
        print('my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    # 每隔5秒运行一次my_job2
    @sched.scheduled_job('cron', second='*/5', id='my_job2')
    def my_job2():
        print('my_job2 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    sched.start()
    登入後複製

    移除作業

    沒有移除作業

    # -*- coding:utf-8 -*-
    from apscheduler.schedulers.blocking import BlockingScheduler
    import datetime
    
    def my_job(text=""):
        print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    sched = BlockingScheduler()
    job = sched.add_job(my_job, 'interval', seconds=2, args=['第一个作业'])
    # #如果有多个任务序列的话可以给每个任务设置ID号,可以根据ID号选择清除对象,且remove放到start前才有效
    sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二个作业'])
    sched.start()
    登入後複製

    程式碼執行結果:

    如何安裝並使用Python中的第三方模組apscheduler?

    使用remove() 移除作业

    # -*- coding:utf-8 -*-
    from apscheduler.schedulers.blocking import BlockingScheduler
    import datetime
    
    def my_job(text=""):
        print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    sched = BlockingScheduler()
    job = sched.add_job(my_job, 'interval', seconds=2, args=['第一个作业'])
    job.remove()
    # #如果有多个任务序列的话可以给每个任务设置ID号,可以根据ID号选择清除对象,且remove放到start前才有效
    sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二个作业'])
    sched.start()
    登入後複製

    代码执行结果:

    如何安裝並使用Python中的第三方模組apscheduler?

    使用remove_job()移除作业

    # -*- coding:utf-8 -*-
    from apscheduler.schedulers.blocking import BlockingScheduler
    import datetime
    
    def my_job(text=""):
        print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    sched = BlockingScheduler()
    job = sched.add_job(my_job, 'interval', seconds=2, args=['第一个作业'])
    # #如果有多个任务序列的话可以给每个任务设置ID号,可以根据ID号选择清除对象,且remove放到start前才有效
    sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二个作业'])
    sched.remove_job('my_job_id')
    sched.start()
    登入後複製

    代码执行结果:

    如何安裝並使用Python中的第三方模組apscheduler?

    触发器类型

    APScheduler有3中内置的触发器类型:

    • 新建一个调度器(scheduler);

    • 添加一个调度任务(job store);

    • 运行调度任务。

    代码实现

    # -*- coding:utf-8 -*-
    import time
    import datetime
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    def my_job(text="默认值"):
        print(text, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    
    sched = BlockingScheduler()
    sched.add_job(my_job, 'interval', seconds=3, args=['3秒定时'])
    # 2018-3-17 00:00:00 执行一次,args传递一个text参数
    sched.add_job(my_job, 'date', run_date=datetime.date(2019, 10, 17), args=['根据年月日定时执行'])
    # 2018-3-17 13:46:00 执行一次,args传递一个text参数
    sched.add_job(my_job, 'date', run_date=datetime.datetime(2019, 10, 17, 14, 10, 0), args=['根据年月日时分秒定时执行'])
    # sched.start()
    """
    interval 间隔调度,参数如下:
        weeks (int) – 间隔几周
        days (int) – 间隔几天
        hours (int) – 间隔几小时
        minutes (int) – 间隔几分钟
        seconds (int) – 间隔多少秒
        start_date (datetime|str) – 开始日期
        end_date (datetime|str) – 结束日期
        timezone (datetime.tzinfo|str) – 时区
    """
    """
    cron参数如下:
        year (int|str) – 年,4位数字
        month (int|str) – 月 (范围1-12)
        day (int|str) – 日 (范围1-31)
        week (int|str) – 周 (范围1-53)
        day_of_week (int|str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
        hour (int|str) – 时 (范围0-23)
        minute (int|str) – 分 (范围0-59)
        second (int|str) – 秒 (范围0-59)
        start_date (datetime|str) – 最早开始日期(包含)
        end_date (datetime|str) – 最晚结束时间(包含)
        timezone (datetime.tzinfo|str) – 指定时区
    """
    # my_job将会在6,7,8,11,12月的第3个周五的1,2,3点运行
    sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
    # 截止到2018-12-30 00:00:00,每周一到周五早上五点半运行job_function
    sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2018-12-31')
    
    # 表示2017年3月22日17时19分07秒执行该程序
    sched.add_job(my_job, 'cron', year=2017, month=3, day=22, hour=17, minute=19, second=7)
    
    # 表示任务在6,7,8,11,12月份的第三个星期五的00:00,01:00,02:00,03:00 执行该程序
    sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
    
    # 表示从星期一到星期五5:30(AM)直到2014-05-30 00:00:00
    sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
    
    # 表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5
    sched.add_job(my_job, 'cron', second='*/5', args=['5秒定时'])
    sched.start()
    登入後複製
    cron表达式参数描述
    *anyFire on every value
    */aanyFire every a values, starting from the minimum
    a-banyFire on any value within the a-b range (a must be smaller than b)
    a-b/canyFire every c values within the a-b range
    xth ydayFire on the x -th occurrence of weekday y within the month
    last xdayFire on the last occurrence of weekday x within the month
    lastdayFire on the last day within the month
    x,y,zanyFire on any matching expression; can combine any number of any of the above expressions

    使用SQLAlchemy作业存储器存放作业

    # -*- coding:utf-8 -*-
    from apscheduler.schedulers.blocking import BlockingScheduler
    from datetime import datetime
    import logging
    sched = BlockingScheduler()
    
    def my_job():
        print('my_job is running, Now is %s' % datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    # 使用sqlalchemy作业存储器
    # 根据自己电脑安装的库选择用什么连接 ,如pymysql   其中:scrapy表示数据库的名称,操作数据库之前应创建对应的数据库
    url = 'mysql+pymysql://root:123456@localhost:3306/scrapy?charset=utf8'
    sched.add_jobstore('sqlalchemy', url=url)
    # 添加作业
    sched.add_job(my_job, 'interval', id='myjob', seconds=5)
    
    log = logging.getLogger('apscheduler.executors.default')
    log.setLevel(logging.INFO)  # DEBUG
    # 设定日志格式
    fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s')
    h = logging.StreamHandler()
    h.setFormatter(fmt)
    log.addHandler(h)
    
    sched.start()
    登入後複製

    暂停和恢复作业

    # 暂停作业:
    apsched.job.Job.pause()
    apsched.schedulers.base.BaseScheduler.pause_job()
    # 恢复作业:
    apsched.job.Job.resume()
    apsched.schedulers.base.BaseScheduler.resume_job()
    登入後複製

    获得job列表

    • get_jobs(),它会返回所有的job实例;

    • 使用print_jobs()来输出所有格式化的作业列表;

    • get_job(job_id=“任务ID”)获取指定任务的作业列表。

    代码实现:

    # -*- coding:utf-8 -*-
    from apscheduler.schedulers.blocking import BlockingScheduler
    import datetime
    
    def my_job(text=""):
        print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    sched = BlockingScheduler()
    
    job = sched.add_job(my_job, 'interval', seconds=2, args=['第一个作业'])
    sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二个作业'])
    
    print(sched.get_jobs())
    
    print(sched.get_job(job_id="my_job_id"))
    
    sched.print_jobs()
    sched.start()
    登入後複製

    关闭调度器

    默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将wait选项设置为False。

    sched.shutdown()
    sched.shutdown(wait=False)
    登入後複製

    以上是如何安裝並使用Python中的第三方模組apscheduler?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    本網站聲明
    本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

    熱AI工具

    Undresser.AI Undress

    Undresser.AI Undress

    人工智慧驅動的應用程序,用於創建逼真的裸體照片

    AI Clothes Remover

    AI Clothes Remover

    用於從照片中去除衣服的線上人工智慧工具。

    Undress AI Tool

    Undress AI Tool

    免費脫衣圖片

    Clothoff.io

    Clothoff.io

    AI脫衣器

    Video Face Swap

    Video Face Swap

    使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

    熱工具

    記事本++7.3.1

    記事本++7.3.1

    好用且免費的程式碼編輯器

    SublimeText3漢化版

    SublimeText3漢化版

    中文版,非常好用

    禪工作室 13.0.1

    禪工作室 13.0.1

    強大的PHP整合開發環境

    Dreamweaver CS6

    Dreamweaver CS6

    視覺化網頁開發工具

    SublimeText3 Mac版

    SublimeText3 Mac版

    神級程式碼編輯軟體(SublimeText3)

    PHP和Python:解釋了不同的範例 PHP和Python:解釋了不同的範例 Apr 18, 2025 am 12:26 AM

    PHP主要是過程式編程,但也支持面向對象編程(OOP);Python支持多種範式,包括OOP、函數式和過程式編程。 PHP適合web開發,Python適用於多種應用,如數據分析和機器學習。

    在PHP和Python之間進行選擇:指南 在PHP和Python之間進行選擇:指南 Apr 18, 2025 am 12:24 AM

    PHP適合網頁開發和快速原型開發,Python適用於數據科學和機器學習。 1.PHP用於動態網頁開發,語法簡單,適合快速開發。 2.Python語法簡潔,適用於多領域,庫生態系統強大。

    Python vs. JavaScript:學習曲線和易用性 Python vs. JavaScript:學習曲線和易用性 Apr 16, 2025 am 12:12 AM

    Python更適合初學者,學習曲線平緩,語法簡潔;JavaScript適合前端開發,學習曲線較陡,語法靈活。 1.Python語法直觀,適用於數據科學和後端開發。 2.JavaScript靈活,廣泛用於前端和服務器端編程。

    PHP和Python:深入了解他們的歷史 PHP和Python:深入了解他們的歷史 Apr 18, 2025 am 12:25 AM

    PHP起源於1994年,由RasmusLerdorf開發,最初用於跟踪網站訪問者,逐漸演變為服務器端腳本語言,廣泛應用於網頁開發。 Python由GuidovanRossum於1980年代末開發,1991年首次發布,強調代碼可讀性和簡潔性,適用於科學計算、數據分析等領域。

    vs code 可以在 Windows 8 中運行嗎 vs code 可以在 Windows 8 中運行嗎 Apr 15, 2025 pm 07:24 PM

    VS Code可以在Windows 8上運行,但體驗可能不佳。首先確保系統已更新到最新補丁,然後下載與系統架構匹配的VS Code安裝包,按照提示安裝。安裝後,注意某些擴展程序可能與Windows 8不兼容,需要尋找替代擴展或在虛擬機中使用更新的Windows系統。安裝必要的擴展,檢查是否正常工作。儘管VS Code在Windows 8上可行,但建議升級到更新的Windows系統以獲得更好的開發體驗和安全保障。

    visual studio code 可以用於 python 嗎 visual studio code 可以用於 python 嗎 Apr 15, 2025 pm 08:18 PM

    VS Code 可用於編寫 Python,並提供許多功能,使其成為開發 Python 應用程序的理想工具。它允許用戶:安裝 Python 擴展,以獲得代碼補全、語法高亮和調試等功能。使用調試器逐步跟踪代碼,查找和修復錯誤。集成 Git,進行版本控制。使用代碼格式化工具,保持代碼一致性。使用 Linting 工具,提前發現潛在問題。

    notepad 怎麼運行python notepad 怎麼運行python Apr 16, 2025 pm 07:33 PM

    在 Notepad 中運行 Python 代碼需要安裝 Python 可執行文件和 NppExec 插件。安裝 Python 並為其添加 PATH 後,在 NppExec 插件中配置命令為“python”、參數為“{CURRENT_DIRECTORY}{FILE_NAME}”,即可在 Notepad 中通過快捷鍵“F6”運行 Python 代碼。

    vscode 擴展是否是惡意的 vscode 擴展是否是惡意的 Apr 15, 2025 pm 07:57 PM

    VS Code 擴展存在惡意風險,例如隱藏惡意代碼、利用漏洞、偽裝成合法擴展。識別惡意擴展的方法包括:檢查發布者、閱讀評論、檢查代碼、謹慎安裝。安全措施還包括:安全意識、良好習慣、定期更新和殺毒軟件。

    See all articles