So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

王林
Freigeben: 2023-05-13 15:43:06
nach vorne
2000 Leute haben es durchsucht

    1. Hintergrund

    Bei der tatsächlichen Arbeit gibt es einige zeitaufwändige asynchrone Aufgaben, die geplant werden müssen. B. das Senden von E-Mails, das Abrufen von Daten und das Ausführen geplanter Skripte Aufgabendatenspeicherung

    2. Offizielle Dokumentation für Celery zum dynamischen Hinzufügen geplanter Aufgaben

    celery-Dokumentation: https://docs.celeryproject.org/en/latest/userguide/periodic -tasks.html#beat-custom-schedulers#🎜 🎜#

    celery Beschreibung der benutzerdefinierten Planungsklasse:

    Die benutzerdefinierte Scheduler-Klasse kann in der Befehlszeile angegeben werden (Parameter --scheduler) # 🎜🎜#

    django-celery-beat Dokumentation: https://pypi.org/project/django-celery-beat/

    Anleitung zum Django-celery-beat Plug-in:

    Diese Erweiterung ermöglicht Ihnen das Speichern periodischer Aufgabenpläne in der Datenbank. Periodische Aufgaben können über die Django-Administratoroberfläche verwaltet werden, wo Sie periodische Aufgaben erstellen, bearbeiten und löschen und festlegen können, wie oft sie ausgeführt werden sollen # 🎜🎜#

    3. Sellerie ist einfach und praktisch

    3.1 Grundlegende Umgebungskonfiguration

    1. Installieren Sie die neueste Version von Django

    pip3 install django #当前我安装的版本是 3.0.6
    Nach dem Login kopieren

    2. Erstellen Sie ein Projekt#🎜 🎜#
    django-admin startproject typeidea
    django-admin startapp blog
    Nach dem Login kopieren

    3. Installieren Sie Sellerie

    pip3 install django-celery
    pip3 install -U Celery 
    pip3 install "celery[librabbitmq,redis,auth,msgpack]" 
    pip3 install django-celery-beat # 用于动态添加定时任务
    pip3 install django-celery-results
    pip3 install redis
    Nach dem Login kopieren
    3.2 Testen Sie mit der Celery-Anwendung

    # 🎜🎜#1. Erstellen Sie ein Blog-Verzeichnis und erstellen Sie eine neue task.py#🎜🎜 #

    Erstellen Sie zunächst einen Blog-Ordner im Django-Projekt und erstellen Sie das Tasks.py-Modul unter dem Blog-Ordner. wie folgt:

    Der task.py-Code lautet wie folgt:

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: tasks.py
    #Time: 2022/3/30 2:26 下午
    #Author: julius
    """
    from celery import Celery
     
    # 使用redis做为broker
    app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0')
     
    # 创建任务函数
    @app.task
    def my_task():
        print('任务正在执行...')
    Nach dem Login kopieren
    Celery Der erste Parameter dient zum Festlegen eines Namens Der zweite Parameter besteht darin, einen Zwischenhändler festzulegen. Hier verwenden wir Redis als Zwischenhändler. Die Funktion my_task ist eine von uns geschriebene Aufgabenfunktion, die durch Hinzufügen des Dekorators app.task in der Warteschlange des Brokers registriert wird.

    2. Redis starten und Worker erstellen

    So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

    Jetzt erstellen wir einen Worker und warten darauf, Aufgaben in der Warteschlange zu verarbeiten.

    Geben Sie das Stammverzeichnis des Projekts ein und führen Sie den Befehl aus: celery -A celery_tasks.tasks worker -l info

    #🎜🎜 #3. Rufen Sie die Aufgabe auf

    Lassen Sie uns die Funktion testen, eine Aufgabe erstellen, sie zur Aufgabenwarteschlange hinzufügen und die Ausführung durch den Arbeiter bereitstellen.

    Geben Sie das Python-Terminal ein und führen Sie den folgenden Code aus:

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> my_task.delay()
    <AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>
    Nach dem Login kopieren
    So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügenDer Aufruf einer Aufgabenfunktion gibt ein AsyncResult-Objekt zurück, mit dem der Status der Aufgabe überprüft oder abgerufen werden kann Der Rückgabewert der Aufgabe.

    4. Sehen Sie sich die Ergebnisse an

    Überprüfen Sie den Aufgabenausführungsstatus auf dem Terminal des Arbeiters und Sie können das 83484dfe-f729-417b-8e51 sehen wurde empfangen -6c7ae32a1377 Aufgabe und druckte die Aufgabenausführungsinformationen

    5 Speichern und zeigen Sie den Aufgabenausführungsstatus an

    Wenn Sie das Ergebnis der Aufgabenausführung ret zuweisen und dann result() aufrufen, wird ein DisabledBackend-Fehler generiert. Es ist ersichtlich, dass die Statusinformationen der Aufgabenausführung nicht gespeichert werden können, wenn der Backend-Speicher nicht konfiguriert ist Im nächsten Abschnitt werden wir darüber sprechen, wie Sie den Backend-Speicher konfigurieren

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> ret=my_task.delay()
    >>> ret.result()
    Nach dem Login kopieren

    4 #So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügenWenn wir den Status der Aufgabe verfolgen möchten, muss Celery die Ergebnisse irgendwo speichern. Es stehen mehrere Speicheroptionen zur Verfügung: SQLAlchemy, Django ORM, Memcached, Redis, RPC (RabbitMQ/AMQP).

    1. Fügen Sie den Backend-Parameter

    hinzu. In diesem Beispiel verwenden wir Redis als Lösung zum Speichern von Ergebnissen und legen ihn über den Backend-Parameter von Celery fest Speicheradresse für Aufgabenergebnisse. Wir haben das Aufgabenmodul wie folgt geändert:

    from celery import Celery
     
    # 使用redis作为broker以及backend
    app = Celery(&#39;celery_tasks.tasks&#39;,
                 broker=&#39;redis://127.0.0.1:6379/8&#39;,
                 backend=&#39;redis://127.0.0.1:6379/9&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b):
        print("任务函数正在执行....")
        return a + b
    Nach dem Login kopieren

    Den Backend-Parameter zu Celery hinzugefügt, redis als Ergebnisspeicher angegeben und die Aufgabenfunktion in zwei Parameter und einen Rückgabewert geändert. So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

    2. Rufen Sie die Aufgabe auf/sehen Sie sich das Ergebnis der Aufgabenausführung an.

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> res=my_task.delay(10,40)
    >>> res.result
    50
    >>> res.failed()
    False
    Nach dem Login kopieren

    Sehen wir uns die Ausführung des Arbeiters wie folgt an:

    Sie können sehen, dass die Sellerieaufgabe wurde erfolgreich ausgeführt.

    Aber das ist erst der Anfang. Der nächste Schritt besteht darin, zu sehen, wie man geplante Aufgaben hinzufügt.

    4. Optimieren Sie die Celery-Verzeichnisstruktur

    Das Obige schreibt alle Aufgaben zur Erstellung, Konfiguration und Aufgaben von Celery direkt in eine Datei, sodass das Projekt immer größer wird später ist es auch unbequem. Lassen Sie es uns aufschlüsseln und einige häufig verwendete Parameter hinzufügen.

    Die Grundstruktur ist wie folgt

    $ vim typeidea/celery.py (Sellerie-Anwendungsdatei)So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celery.py
    #Time: 2022/3/30 12:25 下午
    #Author: julius
    """
    import os
    from celery import Celery
    from blog import celeryconfig
    project_name=&#39;typeidea&#39;
    # set the default django setting module for the &#39;celery&#39; program
    os.environ.setdefault(&#39;DJANGO_SETTINGS_MODULE&#39;,&#39;typeidea.settings&#39;)
    app = Celery(project_name)
     
    app.config_from_object(&#39;django.conf:settings&#39;)
     
    app.autodiscover_tasks()
    Nach dem Login kopieren

    vim blog/celeryconfig.py (配置Celery的参数文件)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celeryconfig.py
    #Time: 2022/3/30 2:54 下午
    #Author: julius
    """
    
    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;
    Nach dem Login kopieren

    vim blog/tasks.py (tasks 任务文件)

    import time
    from blog.celery import app
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c
    Nach dem Login kopieren

    五、开始使用django-celery-beat调度器

    使用 django-celery-beat 动态添加定时任务 celery 4.x 版本在 django 框架中是使用 django-celery-beat 进行动态添加定时任务的。前面虽然已经安装了这个库,但是还要再说明一下。

    1. 安装 django-celery-beat

    pip3 install django-celery-beat
    Nach dem Login kopieren

    2.在项目的 settings 文件配置 django-celery-beat

    INSTALLED_APPS = [
        &#39;blog&#39;,
        &#39;django_celery_beat&#39;,
        ...
    ]
     
    # Django设置时区
    LANGUAGE_CODE = &#39;zh-hans&#39;  # 使用中国语言
    TIME_ZONE = &#39;Asia/Shanghai&#39;  # 设置Django使用中国上海时间
    # 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用
    # 如果USE_TZ 设置为False,TIME_ZONE = &#39;Asia/Shanghai&#39;, 则使用上海的UTC时间。
    USE_TZ = False
    Nach dem Login kopieren

    3. 创建 django-celery-beat 相关表

    执行Django数据库迁移: python manage.py migrate

    So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

    4. 配置Celery使用 django-celery-beat

    配置 celery.py

    import os
     
    from celery import Celery
     
    from blog import celeryconfig
     
    # 为celery 设置环境变量
    os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings")
    # 创建celery app
    app = Celery(&#39;blog&#39;)
    # 从单独的配置模块中加载配置
    app.config_from_object(celeryconfig)
     
    # 设置app自动加载任务
    app.autodiscover_tasks([
        &#39;blog&#39;,
    ])
    Nach dem Login kopieren

    配置 celeryconfig.py

    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = &#39;redis://127.0.0.1:6379/0&#39;
    # 设置代理人broker
    BROKER_URL = &#39;redis://127.0.0.1:6379/1&#39;
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = &#39;django_celery_beat.schedulers:DatabaseScheduler&#39;
    Nach dem Login kopieren

    编写任务 tasks.py

    import time
    from celery import Celery
    from blog.celery import app
     
    # 使用redis做为broker
    # app = Celery(&#39;blog.tasks2&#39;,broker=&#39;redis://127.0.0.1:6379/0&#39;,backend=&#39;redis://127.0.0.1:6379/1&#39;)
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print(&#39;任务正在执行...&#39;)
        print(&#39;任务1函数休眠10s&#39;)
        time.sleep(10)
        return a + b + c
     
    @app.task
    def my_task2():
        print("任务2函数正在执行....")
        print(&#39;任务2函数休眠10s&#39;)
        time.sleep(10)
    Nach dem Login kopieren

    5. 启动定时任务work

    启动定时任务首先需要有一个work执行异步任务,然后再启动一个定时器触发任务。

    启动任务 work

    $ celery -A blog worker -l info
    Nach dem Login kopieren

    So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

    启动定时器触发 beat

    celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
    Nach dem Login kopieren

    So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

    六、具体操作演练

    6.1 创建基于间隔时间的周期性任务

    1. 初始化周期间隔对象interval 对象

    >>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
    >>> schedule, created = IntervalSchedule.objects.get_or_create( 
    ...       every=10, 
    ...       period=IntervalSchedule.SECONDS, 
    ...  )
    >>> IntervalSchedule.objects.all()
    <QuerySet [<IntervalSchedule: every 10 seconds>]>
    Nach dem Login kopieren

    2.创建一个无参数的周期性间隔任务

    >>>PeriodicTask.objects.create(interval=schedule,name=&#39;my_task2&#39;,task=&#39;blog.tasks.my_task2&#39;,)
    <PeriodicTask: my_task2: every 10 seconds>
    Nach dem Login kopieren

    beat 调度服务日志显示如下:

    So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

    worker 服务日志显示如下:

    So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

    3.创建一个带参数的周期性间隔任务

    >>> PeriodicTask.objects.create(interval=schedule,name=&#39;my_task&#39;,task=&#39;blog.tasks.my_task&#39;,args=json.dumps([10,20,30]))
    <PeriodicTask: my_task: every 10 seconds>
    Nach dem Login kopieren

    beat 调度服务日志结果:

    So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

    worker 服务日志结果:

    So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

    4.如何高并发执行任务

    需要并行执行任务的时候,就需要设置多个worker来执行任务。

    6.2 创建一个不带参数的周期性间隔任务

    1.初始化 crontab 的调度对象

    >>> import pytz
    >>> schedule, _ = CrontabSchedule.objects.get_or_create(
    ... minute=&#39;*&#39;,
    ... hour=&#39;*&#39;,
    ... day_of_week=&#39;*&#39;,
    ... day_of_month=&#39;*&#39;,
    ... timezone=pytz.timezone(&#39;Asia/Shanghai&#39;)
    ... )
    Nach dem Login kopieren

    2. 创建不带参数的定时任务

    PeriodicTask.objects.create(crontab=schedule,name=&#39;my_task2_crontab&#39;,task=&#39;blog.tasks.my_task2&#39;,)
    Nach dem Login kopieren

    beat 调度服务执行结果

    So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

    worker 执行服务结果

    So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

    6.3 周期性任务的查询、删除操作

    1. 周期性任务的查询

    >>> PeriodicTask.objects.all()
    <ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> for task in PeriodicTask.objects.all():
    ...     print(task.id)
    ... 
    1
    13
    >>> PeriodicTask.objects.get(id=13)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    Nach dem Login kopieren

    控制台实际操作记录

    So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

    2.周期性任务的暂停/启动

    2.1 设置my_taks2_crontab 暂停任务

    >>> my_task2_crontab = PeriodicTask.objects.get(id=13)
    >>> my_task2_crontab.enabled
    True
    >>> my_task2_crontab.enabled=False
    >>> my_task2_crontab.save()
    Nach dem Login kopieren

    查看worker输出:

    So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

    可以看到worker从19:31以后已经没有输出了,说明已经成功吧my_task2_crontab 任务暂停

    2.2 设置my_task2_crontab 开启任务

    把任务的 enabled 为 True 即可:

    >>> my_task2_crontab.enabled
    False
    >>> my_task2_crontab.enabled=True
    >>> my_task2_crontab.save()
    Nach dem Login kopieren

    查看worker输出:

    So verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen

    可以看到worker从19:36开始有输出,说明已把my_task2_crontab 任务重新启动

    3. 周期性任务的删除

    获取到指定的任务后调用delete(),再次查询指定任务会发现已经不存在了

    PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;).delete()
    >>> PeriodicTask.objects.get(name=&#39;my_task2_crontab&#39;)
    Traceback (most recent call last):
      File "<console>", line 1, in <module>
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method
        return getattr(self.get_queryset(), name)(*args, **kwargs)
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get
        raise self.model.DoesNotExist(
    django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.
    Nach dem Login kopieren

    Das obige ist der detaillierte Inhalt vonSo verwenden Sie Python Celery, um geplante Aufgaben dynamisch hinzuzufügen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Verwandte Etiketten:
    Quelle:yisu.com
    Erklärung dieser Website
    Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
    Beliebte Tutorials
    Mehr>
    Neueste Downloads
    Mehr>
    Web-Effekte
    Quellcode der Website
    Website-Materialien
    Frontend-Vorlage