目錄
一、背景
二、Celery動態新增定時任務的官方文件
三、celery簡單實用
#3.1 基礎環境配置
3.2 測試使用Celery應用程式
四、設定backend儲存任務執行結果 
四、優化Celery目錄結構
五、开始使用django-celery-beat调度器
六、具体操作演练
6.1 创建基于间隔时间的周期性任务
6.2 创建一个不带参数的周期性间隔任务
6.3 周期性任务的查询、删除操作
首頁 後端開發 Python教學 怎麼用Python Celery動態加入定時任務

怎麼用Python Celery動態加入定時任務

May 13, 2023 pm 03:43 PM
python celery

    一、背景

    在實際工作中會有一些耗時的非同步任務需要使用定時調度,例如發送郵件,拉取數據,執行定時腳本

    透過celery 實現調度主要思想是透過引入中間人redis,啟動worker 進行任務執行 ,celery-beat進行定時任務資料儲存

    二、Celery動態新增定時任務的官方文件

    celery文件:https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

    celery 自訂排程類別說明: 

    自訂調度器類別可以在命令列中指定(--scheduler參數)

    django-celery-beat文件: https://pypi.org/project/django-celery-beat/

    關於django-celery-beat 外掛程式的說明: 

    此擴充功能可讓您將定期任務計畫儲存在資料庫中,可以從Django 管理介面管理週期性任務,您可以在其中建立、編輯和刪除週期性任務以及它們應該運行的頻率

    三、celery簡單實用

    #3.1 基礎環境配置

    1. 安裝最新版本的Django

    pip3 install django #当前我安装的版本是 3.0.6
    登入後複製

    2. 建立專案

    django-admin startproject typeidea
    django-admin startapp blog
    登入後複製

    3.安裝celery

    pip3 install django-celery
    pip3 install -U Celery 
    pip3 install "celery[librabbitmq,redis,auth,msgpack]" 
    pip3 install django-celery-beat # 用于动态添加定时任务
    pip3 install django-celery-results
    pip3 install redis
    登入後複製

    3.2 測試使用Celery應用程式

    #1. 創建blog目錄、新建task.py

    首先在Django專案中建立一個blog資料夾,並且在blog資料夾下建立tasks.py模組, 如下:

    怎麼用Python Celery動態加入定時任務

     tasks.py代碼如下: 

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: tasks.py
    #Time: 2022/3/30 2:26 下午
    #Author: julius
    """
    from celery import Celery
     
    # 使用redis做为broker
    app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0')
     
    # 创建任务函数
    @app.task
    def my_task():
        print('任务正在执行...')
    登入後複製

    Celery第一個參數是給其設定一個名字, 第二參數我們設定一個中間人broker, 在這裡我們使用Redis作為中間人。 my_task函數是我們寫的一個任務函數, 透過加上裝飾器app.task, 將其註冊到broker的佇列中。

    2. 啟動redis、建立worker

    現在我們在建立一個worker, 等待處理佇列中的任務。

    進入專案的根目錄,執行指令: celery -A celery_tasks.tasks worker -l info

    怎麼用Python Celery動態加入定時任務

     3. 呼叫任務

    下面來測試一下功能,建立一個任務,加入任務佇列中,提供worker執行。

    進入python終端, 執行如下程式碼:

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> my_task.delay()
    <AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>
    登入後複製

    呼叫一個任務函數,將會傳回一個AsyncResult對象,這個物件可以用來檢查任務的狀態或是取得任務的回傳值。

    4. 查看結果

    在worker的終端機查看任務執行情況,可以看到已經收到83484dfe-f729-417b-8e51-6c7ae32a1377 任務,並列印了任務執行資訊

    怎麼用Python Celery動態加入定時任務

    5. 儲存並檢視任務執行狀態

    把任務執行結果賦值給ret,然後呼叫result () 會產生 DisabledBackend 報錯,可見沒有設定後端儲存的時候並不能儲存任務執行的狀態訊息,下一節我們會講到如何設定backend儲存任務執行結果

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> ret=my_task.delay()
    >>> ret.result()
    登入後複製

    怎麼用Python Celery動態加入定時任務

    四、設定backend儲存任務執行結果 

    如果我們想追蹤任務的狀態,Celery需要將結果儲存到某個地方。有幾種保存的方案可選:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

    1. 新增backend參數

    在本例中我們使用Redis作為儲存結果的方案,透過Celery的backend參數來設定任務結果儲存位址。我們將tasks模組修改如下:

    from celery import Celery
     
    # 使用redis作为broker以及backend
    app = Celery(&#39;celery_tasks.tasks&#39;,
                 broker=&#39;redis://127.0.0.1:6379/8&#39;,
                 backend=&#39;redis://127.0.0.1:6379/9&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b):
        print("任务函数正在执行....")
        return a + b
    登入後複製

    給Celery增加了backend參數,指定redis作為結果存儲,並將任務函數修改為兩個參數,並且有返回值。

    2. 呼叫任務/查看任務執行結果

    下面再來執行呼叫一下這個任務看看。

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> res=my_task.delay(10,40)
    >>> res.result
    50
    >>> res.failed()
    False
    登入後複製

    再來看看worker的執行情況,如下:

    怎麼用Python Celery動態加入定時任務

    #可以看到celery任務已經執行成功了。

    但這只是一個開始,下一步要看看如何新增定時的任務。

    四、優化Celery目錄結構

    上面直接將Celery的應用程式建立、配置、tasks任務全部寫在了一個文件,這樣在後面專案越來越大,也是不方便的。下面來拆分一下,並且加入一些常用的參數。

    基本架構如下

    怎麼用Python Celery動態加入定時任務

    $ vim typeidea/celery.py (Celery應用程式檔案)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celery.py
    #Time: 2022/3/30 12:25 下午
    #Author: julius
    """
    import os
    from celery import Celery
    from blog import celeryconfig
    project_name=&#39;typeidea&#39;
    # set the default django setting module for the &#39;celery&#39; program
    os.environ.setdefault(&#39;DJANGO_SETTINGS_MODULE&#39;,&#39;typeidea.settings&#39;)
    app = Celery(project_name)
     
    app.config_from_object(&#39;django.conf:settings&#39;)
     
    app.autodiscover_tasks()
    登入後複製

    vim blog/celeryconfig.py (配置Celery的参数文件)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celeryconfig.py
    #Time: 2022/3/30 2:54 下午
    #Author: julius
    """
    
    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;
    登入後複製

    vim blog/tasks.py (tasks 任务文件)

    import time
    from blog.celery import app
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c
    登入後複製

    五、开始使用django-celery-beat调度器

    使用 django-celery-beat 动态添加定时任务 celery 4.x 版本在 django 框架中是使用 django-celery-beat 进行动态添加定时任务的。前面虽然已经安装了这个库,但是还要再说明一下。

    1. 安装 django-celery-beat

    pip3 install django-celery-beat
    登入後複製

    2.在项目的 settings 文件配置 django-celery-beat

    INSTALLED_APPS = [
        &#39;blog&#39;,
        &#39;django_celery_beat&#39;,
        ...
    ]
     
    # Django设置时区
    LANGUAGE_CODE = &#39;zh-hans&#39;  # 使用中国语言
    TIME_ZONE = &#39;Asia/Shanghai&#39;  # 设置Django使用中国上海时间
    # 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用
    # 如果USE_TZ 设置为False,TIME_ZONE = &#39;Asia/Shanghai&#39;, 则使用上海的UTC时间。
    USE_TZ = False
    登入後複製

    3. 创建 django-celery-beat 相关表

    执行Django数据库迁移: python manage.py migrate

    怎麼用Python Celery動態加入定時任務

    4. 配置Celery使用 django-celery-beat

    配置 celery.py

    import os
     
    from celery import Celery
     
    from blog import celeryconfig
     
    # 为celery 设置环境变量
    os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings")
    # 创建celery app
    app = Celery(&#39;blog&#39;)
    # 从单独的配置模块中加载配置
    app.config_from_object(celeryconfig)
     
    # 设置app自动加载任务
    app.autodiscover_tasks([
        &#39;blog&#39;,
    ])
    登入後複製

    配置 celeryconfig.py

    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;
    登入後複製

    编写任务 tasks.py

    import time
    from celery import Celery
    from blog.celery import app
     
    # 使用redis做为broker
    # app = Celery(&#39;blog.tasks2&#39;,broker=&#39;redis://127.0.0.1:6379/0&#39;,backend=&#39;redis://127.0.0.1:6379/1&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c
     
    @app.task
    def my_task2():
        print("任务2函数正在执行....")
        print(&#39;任务2函数休眠10s&#39;)
        time.sleep(10)
    登入後複製

    5. 启动定时任务work

    启动定时任务首先需要有一个work执行异步任务,然后再启动一个定时器触发任务。

    启动任务 work

    $ celery -A blog worker -l info
    登入後複製

    怎麼用Python Celery動態加入定時任務

    启动定时器触发 beat

    celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
    登入後複製

    怎麼用Python Celery動態加入定時任務

    六、具体操作演练

    6.1 创建基于间隔时间的周期性任务

    1. 初始化周期间隔对象interval 对象

    >>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
    >>> schedule, created = IntervalSchedule.objects.get_or_create( 
    ...       every=10, 
    ...       period=IntervalSchedule.SECONDS, 
    ...  )
    >>> IntervalSchedule.objects.all()
    <QuerySet [<IntervalSchedule: every 10 seconds>]>
    登入後複製

    2.创建一个无参数的周期性间隔任务

    >>>PeriodicTask.objects.create(interval=schedule,name=&#39;my_task2&#39;,task=&#39;blog.tasks.my_task2&#39;,)
    <PeriodicTask: my_task2: every 10 seconds>
    登入後複製

    beat 调度服务日志显示如下:

    怎麼用Python Celery動態加入定時任務

    worker 服务日志显示如下:

    怎麼用Python Celery動態加入定時任務

    3.创建一个带参数的周期性间隔任务

    >>> PeriodicTask.objects.create(interval=schedule,name=&#39;my_task&#39;,task=&#39;blog.tasks.my_task&#39;,args=json.dumps([10,20,30]))
    <PeriodicTask: my_task: every 10 seconds>
    登入後複製

    beat 调度服务日志结果:

    怎麼用Python Celery動態加入定時任務

    worker 服务日志结果:

    怎麼用Python Celery動態加入定時任務

    4.如何高并发执行任务

    需要并行执行任务的时候,就需要设置多个worker来执行任务。

    6.2 创建一个不带参数的周期性间隔任务

    1.初始化 crontab 的调度对象

    >>> import pytz
    >>> schedule, _ = CrontabSchedule.objects.get_or_create(
    ... minute=&#39;*&#39;,
    ... hour=&#39;*&#39;,
    ... day_of_week=&#39;*&#39;,
    ... day_of_month=&#39;*&#39;,
    ... timezone=pytz.timezone(&#39;Asia/Shanghai&#39;)
    ... )
    登入後複製

    2. 创建不带参数的定时任务

    PeriodicTask.objects.create(crontab=schedule,name=&#39;my_task2_crontab&#39;,task=&#39;blog.tasks.my_task2&#39;,)
    登入後複製

    beat 调度服务执行结果

    怎麼用Python Celery動態加入定時任務

    worker 执行服务结果

    怎麼用Python Celery動態加入定時任務

    6.3 周期性任务的查询、删除操作

    1. 周期性任务的查询

    >>> PeriodicTask.objects.all()
    <ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> for task in PeriodicTask.objects.all():
    ...     print(task.id)
    ... 
    1
    13
    >>> PeriodicTask.objects.get(id=13)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    登入後複製

    控制台实际操作记录

    怎麼用Python Celery動態加入定時任務

    2.周期性任务的暂停/启动

    2.1 设置my_taks2_crontab 暂停任务

    >>> my_task2_crontab = PeriodicTask.objects.get(id=13)
    >>> my_task2_crontab.enabled
    True
    >>> my_task2_crontab.enabled=False
    >>> my_task2_crontab.save()
    登入後複製

    查看worker输出:

    怎麼用Python Celery動態加入定時任務

    可以看到worker从19:31以后已经没有输出了,说明已经成功吧my_task2_crontab 任务暂停

    2.2 设置my_task2_crontab 开启任务

    把任务的 enabled 为 True 即可:

    >>> my_task2_crontab.enabled
    False
    >>> my_task2_crontab.enabled=True
    >>> my_task2_crontab.save()
    登入後複製

    查看worker输出:

    怎麼用Python Celery動態加入定時任務

    可以看到worker从19:36开始有输出,说明已把my_task2_crontab 任务重新启动

    3. 周期性任务的删除

    获取到指定的任务后调用delete(),再次查询指定任务会发现已经不存在了

    PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;).delete()
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    Traceback (most recent call last):
      File "<console>", line 1, in <module>
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method
        return getattr(self.get_queryset(), name)(*args, **kwargs)
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get
        raise self.model.DoesNotExist(
    django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.
    登入後複製

    以上是怎麼用Python Celery動態加入定時任務的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    本網站聲明
    本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

    熱AI工具

    Undresser.AI Undress

    Undresser.AI Undress

    人工智慧驅動的應用程序,用於創建逼真的裸體照片

    AI Clothes Remover

    AI Clothes Remover

    用於從照片中去除衣服的線上人工智慧工具。

    Undress AI Tool

    Undress AI Tool

    免費脫衣圖片

    Clothoff.io

    Clothoff.io

    AI脫衣器

    Video Face Swap

    Video Face Swap

    使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

    熱工具

    記事本++7.3.1

    記事本++7.3.1

    好用且免費的程式碼編輯器

    SublimeText3漢化版

    SublimeText3漢化版

    中文版,非常好用

    禪工作室 13.0.1

    禪工作室 13.0.1

    強大的PHP整合開發環境

    Dreamweaver CS6

    Dreamweaver CS6

    視覺化網頁開發工具

    SublimeText3 Mac版

    SublimeText3 Mac版

    神級程式碼編輯軟體(SublimeText3)

    PHP和Python:解釋了不同的範例 PHP和Python:解釋了不同的範例 Apr 18, 2025 am 12:26 AM

    PHP主要是過程式編程,但也支持面向對象編程(OOP);Python支持多種範式,包括OOP、函數式和過程式編程。 PHP適合web開發,Python適用於多種應用,如數據分析和機器學習。

    在PHP和Python之間進行選擇:指南 在PHP和Python之間進行選擇:指南 Apr 18, 2025 am 12:24 AM

    PHP適合網頁開發和快速原型開發,Python適用於數據科學和機器學習。 1.PHP用於動態網頁開發,語法簡單,適合快速開發。 2.Python語法簡潔,適用於多領域,庫生態系統強大。

    PHP和Python:深入了解他們的歷史 PHP和Python:深入了解他們的歷史 Apr 18, 2025 am 12:25 AM

    PHP起源於1994年,由RasmusLerdorf開發,最初用於跟踪網站訪問者,逐漸演變為服務器端腳本語言,廣泛應用於網頁開發。 Python由GuidovanRossum於1980年代末開發,1991年首次發布,強調代碼可讀性和簡潔性,適用於科學計算、數據分析等領域。

    Python vs. JavaScript:學習曲線和易用性 Python vs. JavaScript:學習曲線和易用性 Apr 16, 2025 am 12:12 AM

    Python更適合初學者,學習曲線平緩,語法簡潔;JavaScript適合前端開發,學習曲線較陡,語法靈活。 1.Python語法直觀,適用於數據科學和後端開發。 2.JavaScript靈活,廣泛用於前端和服務器端編程。

    vs code 可以在 Windows 8 中運行嗎 vs code 可以在 Windows 8 中運行嗎 Apr 15, 2025 pm 07:24 PM

    VS Code可以在Windows 8上運行,但體驗可能不佳。首先確保系統已更新到最新補丁,然後下載與系統架構匹配的VS Code安裝包,按照提示安裝。安裝後,注意某些擴展程序可能與Windows 8不兼容,需要尋找替代擴展或在虛擬機中使用更新的Windows系統。安裝必要的擴展,檢查是否正常工作。儘管VS Code在Windows 8上可行,但建議升級到更新的Windows系統以獲得更好的開發體驗和安全保障。

    sublime怎麼運行代碼python sublime怎麼運行代碼python Apr 16, 2025 am 08:48 AM

    在 Sublime Text 中運行 Python 代碼,需先安裝 Python 插件,再創建 .py 文件並編寫代碼,最後按 Ctrl B 運行代碼,輸出會在控制台中顯示。

    visual studio code 可以用於 python 嗎 visual studio code 可以用於 python 嗎 Apr 15, 2025 pm 08:18 PM

    VS Code 可用於編寫 Python,並提供許多功能,使其成為開發 Python 應用程序的理想工具。它允許用戶:安裝 Python 擴展,以獲得代碼補全、語法高亮和調試等功能。使用調試器逐步跟踪代碼,查找和修復錯誤。集成 Git,進行版本控制。使用代碼格式化工具,保持代碼一致性。使用 Linting 工具,提前發現潛在問題。

    vscode在哪寫代碼 vscode在哪寫代碼 Apr 15, 2025 pm 09:54 PM

    在 Visual Studio Code(VSCode)中編寫代碼簡單易行,只需安裝 VSCode、創建項目、選擇語言、創建文件、編寫代碼、保存並運行即可。 VSCode 的優點包括跨平台、免費開源、強大功能、擴展豐富,以及輕量快速。

    See all articles