目錄
一、背景
二、Celery動態新增定時任務的官方文件
三、celery簡單實用
#3.1 基礎環境配置
3.2 測試使用Celery應用程式
四、設定backend儲存任務執行結果 
四、優化Celery目錄結構
五、开始使用django-celery-beat调度器
六、具体操作演练
6.1 创建基于间隔时间的周期性任务
6.2 创建一个不带参数的周期性间隔任务
6.3 周期性任务的查询、删除操作
首頁 後端開發 Python教學 怎麼用Python Celery動態加入定時任務

怎麼用Python Celery動態加入定時任務

May 13, 2023 pm 03:43 PM
python celery

    一、背景

    在實際工作中會有一些耗時的非同步任務需要使用定時調度,例如發送郵件,拉取數據,執行定時腳本

    透過celery 實現調度主要思想是透過引入中間人redis,啟動worker 進行任務執行 ,celery-beat進行定時任務資料儲存

    二、Celery動態新增定時任務的官方文件

    celery文件:https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

    celery 自訂排程類別說明: 

    自訂調度器類別可以在命令列中指定(--scheduler參數)

    django-celery-beat文件: https://pypi.org/project/django-celery-beat/

    關於django-celery-beat 外掛程式的說明: 

    此擴充功能可讓您將定期任務計畫儲存在資料庫中,可以從Django 管理介面管理週期性任務,您可以在其中建立、編輯和刪除週期性任務以及它們應該運行的頻率

    三、celery簡單實用

    #3.1 基礎環境配置

    1. 安裝最新版本的Django

    pip3 install django #当前我安装的版本是 3.0.6
    登入後複製

    2. 建立專案

    django-admin startproject typeidea
    django-admin startapp blog
    登入後複製

    3.安裝celery

    pip3 install django-celery
    pip3 install -U Celery 
    pip3 install "celery[librabbitmq,redis,auth,msgpack]" 
    pip3 install django-celery-beat # 用于动态添加定时任务
    pip3 install django-celery-results
    pip3 install redis
    登入後複製

    3.2 測試使用Celery應用程式

    #1. 創建blog目錄、新建task.py

    首先在Django專案中建立一個blog資料夾,並且在blog資料夾下建立tasks.py模組, 如下:

    怎麼用Python Celery動態加入定時任務

     tasks.py代碼如下: 

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: tasks.py
    #Time: 2022/3/30 2:26 下午
    #Author: julius
    """
    from celery import Celery
     
    # 使用redis做为broker
    app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0')
     
    # 创建任务函数
    @app.task
    def my_task():
        print('任务正在执行...')
    登入後複製

    Celery第一個參數是給其設定一個名字, 第二參數我們設定一個中間人broker, 在這裡我們使用Redis作為中間人。 my_task函數是我們寫的一個任務函數, 透過加上裝飾器app.task, 將其註冊到broker的佇列中。

    2. 啟動redis、建立worker

    現在我們在建立一個worker, 等待處理佇列中的任務。

    進入專案的根目錄,執行指令: celery -A celery_tasks.tasks worker -l info

    怎麼用Python Celery動態加入定時任務

     3. 呼叫任務

    下面來測試一下功能,建立一個任務,加入任務佇列中,提供worker執行。

    進入python終端, 執行如下程式碼:

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> my_task.delay()
    <AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>
    登入後複製

    呼叫一個任務函數,將會傳回一個AsyncResult對象,這個物件可以用來檢查任務的狀態或是取得任務的回傳值。

    4. 查看結果

    在worker的終端機查看任務執行情況,可以看到已經收到83484dfe-f729-417b-8e51-6c7ae32a1377 任務,並列印了任務執行資訊

    怎麼用Python Celery動態加入定時任務

    5. 儲存並檢視任務執行狀態

    把任務執行結果賦值給ret,然後呼叫result () 會產生 DisabledBackend 報錯,可見沒有設定後端儲存的時候並不能儲存任務執行的狀態訊息,下一節我們會講到如何設定backend儲存任務執行結果

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> ret=my_task.delay()
    >>> ret.result()
    登入後複製

    怎麼用Python Celery動態加入定時任務

    四、設定backend儲存任務執行結果 

    如果我們想追蹤任務的狀態,Celery需要將結果儲存到某個地方。有幾種保存的方案可選:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

    1. 新增backend參數

    在本例中我們使用Redis作為儲存結果的方案,透過Celery的backend參數來設定任務結果儲存位址。我們將tasks模組修改如下:

    from celery import Celery
     
    # 使用redis作为broker以及backend
    app = Celery(&#39;celery_tasks.tasks&#39;,
                 broker=&#39;redis://127.0.0.1:6379/8&#39;,
                 backend=&#39;redis://127.0.0.1:6379/9&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b):
        print("任务函数正在执行....")
        return a + b
    登入後複製

    給Celery增加了backend參數,指定redis作為結果存儲,並將任務函數修改為兩個參數,並且有返回值。

    2. 呼叫任務/查看任務執行結果

    下面再來執行呼叫一下這個任務看看。

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> res=my_task.delay(10,40)
    >>> res.result
    50
    >>> res.failed()
    False
    登入後複製

    再來看看worker的執行情況,如下:

    怎麼用Python Celery動態加入定時任務

    #可以看到celery任務已經執行成功了。

    但這只是一個開始,下一步要看看如何新增定時的任務。

    四、優化Celery目錄結構

    上面直接將Celery的應用程式建立、配置、tasks任務全部寫在了一個文件,這樣在後面專案越來越大,也是不方便的。下面來拆分一下,並且加入一些常用的參數。

    基本架構如下

    怎麼用Python Celery動態加入定時任務

    $ vim typeidea/celery.py (Celery應用程式檔案)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celery.py
    #Time: 2022/3/30 12:25 下午
    #Author: julius
    """
    import os
    from celery import Celery
    from blog import celeryconfig
    project_name=&#39;typeidea&#39;
    # set the default django setting module for the &#39;celery&#39; program
    os.environ.setdefault(&#39;DJANGO_SETTINGS_MODULE&#39;,&#39;typeidea.settings&#39;)
    app = Celery(project_name)
     
    app.config_from_object(&#39;django.conf:settings&#39;)
     
    app.autodiscover_tasks()
    登入後複製

    vim blog/celeryconfig.py (配置Celery的参数文件)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celeryconfig.py
    #Time: 2022/3/30 2:54 下午
    #Author: julius
    """
    
    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;
    登入後複製

    vim blog/tasks.py (tasks 任务文件)

    import time
    from blog.celery import app
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c
    登入後複製

    五、开始使用django-celery-beat调度器

    使用 django-celery-beat 动态添加定时任务 celery 4.x 版本在 django 框架中是使用 django-celery-beat 进行动态添加定时任务的。前面虽然已经安装了这个库,但是还要再说明一下。

    1. 安装 django-celery-beat

    pip3 install django-celery-beat
    登入後複製

    2.在项目的 settings 文件配置 django-celery-beat

    INSTALLED_APPS = [
        &#39;blog&#39;,
        &#39;django_celery_beat&#39;,
        ...
    ]
     
    # Django设置时区
    LANGUAGE_CODE = &#39;zh-hans&#39;  # 使用中国语言
    TIME_ZONE = &#39;Asia/Shanghai&#39;  # 设置Django使用中国上海时间
    # 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用
    # 如果USE_TZ 设置为False,TIME_ZONE = &#39;Asia/Shanghai&#39;, 则使用上海的UTC时间。
    USE_TZ = False
    登入後複製

    3. 创建 django-celery-beat 相关表

    执行Django数据库迁移: python manage.py migrate

    怎麼用Python Celery動態加入定時任務

    4. 配置Celery使用 django-celery-beat

    配置 celery.py

    import os
     
    from celery import Celery
     
    from blog import celeryconfig
     
    # 为celery 设置环境变量
    os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings")
    # 创建celery app
    app = Celery(&#39;blog&#39;)
    # 从单独的配置模块中加载配置
    app.config_from_object(celeryconfig)
     
    # 设置app自动加载任务
    app.autodiscover_tasks([
        &#39;blog&#39;,
    ])
    登入後複製

    配置 celeryconfig.py

    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;
    登入後複製

    编写任务 tasks.py

    import time
    from celery import Celery
    from blog.celery import app
     
    # 使用redis做为broker
    # app = Celery(&#39;blog.tasks2&#39;,broker=&#39;redis://127.0.0.1:6379/0&#39;,backend=&#39;redis://127.0.0.1:6379/1&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c
     
    @app.task
    def my_task2():
        print("任务2函数正在执行....")
        print(&#39;任务2函数休眠10s&#39;)
        time.sleep(10)
    登入後複製

    5. 启动定时任务work

    启动定时任务首先需要有一个work执行异步任务,然后再启动一个定时器触发任务。

    启动任务 work

    $ celery -A blog worker -l info
    登入後複製

    怎麼用Python Celery動態加入定時任務

    启动定时器触发 beat

    celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
    登入後複製

    怎麼用Python Celery動態加入定時任務

    六、具体操作演练

    6.1 创建基于间隔时间的周期性任务

    1. 初始化周期间隔对象interval 对象

    >>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
    >>> schedule, created = IntervalSchedule.objects.get_or_create( 
    ...       every=10, 
    ...       period=IntervalSchedule.SECONDS, 
    ...  )
    >>> IntervalSchedule.objects.all()
    <QuerySet [<IntervalSchedule: every 10 seconds>]>
    登入後複製

    2.创建一个无参数的周期性间隔任务

    >>>PeriodicTask.objects.create(interval=schedule,name=&#39;my_task2&#39;,task=&#39;blog.tasks.my_task2&#39;,)
    <PeriodicTask: my_task2: every 10 seconds>
    登入後複製

    beat 调度服务日志显示如下:

    怎麼用Python Celery動態加入定時任務

    worker 服务日志显示如下:

    怎麼用Python Celery動態加入定時任務

    3.创建一个带参数的周期性间隔任务

    >>> PeriodicTask.objects.create(interval=schedule,name=&#39;my_task&#39;,task=&#39;blog.tasks.my_task&#39;,args=json.dumps([10,20,30]))
    <PeriodicTask: my_task: every 10 seconds>
    登入後複製

    beat 调度服务日志结果:

    怎麼用Python Celery動態加入定時任務

    worker 服务日志结果:

    怎麼用Python Celery動態加入定時任務

    4.如何高并发执行任务

    需要并行执行任务的时候,就需要设置多个worker来执行任务。

    6.2 创建一个不带参数的周期性间隔任务

    1.初始化 crontab 的调度对象

    >>> import pytz
    >>> schedule, _ = CrontabSchedule.objects.get_or_create(
    ... minute=&#39;*&#39;,
    ... hour=&#39;*&#39;,
    ... day_of_week=&#39;*&#39;,
    ... day_of_month=&#39;*&#39;,
    ... timezone=pytz.timezone(&#39;Asia/Shanghai&#39;)
    ... )
    登入後複製

    2. 创建不带参数的定时任务

    PeriodicTask.objects.create(crontab=schedule,name=&#39;my_task2_crontab&#39;,task=&#39;blog.tasks.my_task2&#39;,)
    登入後複製

    beat 调度服务执行结果

    怎麼用Python Celery動態加入定時任務

    worker 执行服务结果

    怎麼用Python Celery動態加入定時任務

    6.3 周期性任务的查询、删除操作

    1. 周期性任务的查询

    >>> PeriodicTask.objects.all()
    <ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> for task in PeriodicTask.objects.all():
    ...     print(task.id)
    ... 
    1
    13
    >>> PeriodicTask.objects.get(id=13)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    登入後複製

    控制台实际操作记录

    怎麼用Python Celery動態加入定時任務

    2.周期性任务的暂停/启动

    2.1 设置my_taks2_crontab 暂停任务

    >>> my_task2_crontab = PeriodicTask.objects.get(id=13)
    >>> my_task2_crontab.enabled
    True
    >>> my_task2_crontab.enabled=False
    >>> my_task2_crontab.save()
    登入後複製

    查看worker输出:

    怎麼用Python Celery動態加入定時任務

    可以看到worker从19:31以后已经没有输出了,说明已经成功吧my_task2_crontab 任务暂停

    2.2 设置my_task2_crontab 开启任务

    把任务的 enabled 为 True 即可:

    >>> my_task2_crontab.enabled
    False
    >>> my_task2_crontab.enabled=True
    >>> my_task2_crontab.save()
    登入後複製

    查看worker输出:

    怎麼用Python Celery動態加入定時任務

    可以看到worker从19:36开始有输出,说明已把my_task2_crontab 任务重新启动

    3. 周期性任务的删除

    获取到指定的任务后调用delete(),再次查询指定任务会发现已经不存在了

    PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;).delete()
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    Traceback (most recent call last):
      File "<console>", line 1, in <module>
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method
        return getattr(self.get_queryset(), name)(*args, **kwargs)
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get
        raise self.model.DoesNotExist(
    django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.
    登入後複製

    以上是怎麼用Python Celery動態加入定時任務的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    本網站聲明
    本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

    熱AI工具

    Undresser.AI Undress

    Undresser.AI Undress

    人工智慧驅動的應用程序,用於創建逼真的裸體照片

    AI Clothes Remover

    AI Clothes Remover

    用於從照片中去除衣服的線上人工智慧工具。

    Undress AI Tool

    Undress AI Tool

    免費脫衣圖片

    Clothoff.io

    Clothoff.io

    AI脫衣器

    AI Hentai Generator

    AI Hentai Generator

    免費產生 AI 無盡。

    熱門文章

    R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
    2 週前 By 尊渡假赌尊渡假赌尊渡假赌
    倉庫:如何復興隊友
    4 週前 By 尊渡假赌尊渡假赌尊渡假赌
    Hello Kitty Island冒險:如何獲得巨型種子
    3 週前 By 尊渡假赌尊渡假赌尊渡假赌

    熱工具

    記事本++7.3.1

    記事本++7.3.1

    好用且免費的程式碼編輯器

    SublimeText3漢化版

    SublimeText3漢化版

    中文版,非常好用

    禪工作室 13.0.1

    禪工作室 13.0.1

    強大的PHP整合開發環境

    Dreamweaver CS6

    Dreamweaver CS6

    視覺化網頁開發工具

    SublimeText3 Mac版

    SublimeText3 Mac版

    神級程式碼編輯軟體(SublimeText3)

    模板化的優點和缺點有哪些? 模板化的優點和缺點有哪些? May 08, 2024 pm 03:51 PM

    模板化的優點和缺點有哪些?

    Google AI 為開發者發佈 Gemini 1.5 Pro 和 Gemma 2 Google AI 為開發者發佈 Gemini 1.5 Pro 和 Gemma 2 Jul 01, 2024 am 07:22 AM

    Google AI 為開發者發佈 Gemini 1.5 Pro 和 Gemma 2

    怎麼下載deepseek 小米 怎麼下載deepseek 小米 Feb 19, 2025 pm 05:27 PM

    怎麼下載deepseek 小米

    分享幾個.NET開源的AI和LLM相關專案框架 分享幾個.NET開源的AI和LLM相關專案框架 May 06, 2024 pm 04:43 PM

    分享幾個.NET開源的AI和LLM相關專案框架

    deepseek怎麼問他 deepseek怎麼問他 Feb 19, 2025 pm 04:42 PM

    deepseek怎麼問他

    evaluate函數怎麼保存 evaluate函數怎麼保存 May 07, 2024 am 01:09 AM

    evaluate函數怎麼保存

    NET40是什麼軟體 NET40是什麼軟體 May 10, 2024 am 01:12 AM

    NET40是什麼軟體

    deepseek該怎麼搜索 deepseek該怎麼搜索 Feb 19, 2025 pm 05:18 PM

    deepseek該怎麼搜索

    See all articles