사용자 정의 Django 명령을 사용하여 Celery 작업자를 자동으로 다시 로드

WBOY
풀어 주다: 2024-07-22 09:40:11
원래의
1314명이 탐색했습니다.

Automatically reload Celery workers with a custom Django command

Celery에는 이전에 --autoreload 플래그가 있었지만 이후 제거되었습니다. 그러나 Django에는 Manage.py runserver 명령에 자동 다시 로드 기능이 내장되어 있습니다. Celery 작업자에 자동 다시 로드가 없으면 개발 환경이 혼란스럽습니다. Python 코드를 업데이트하면 Django 서버가 현재 코드로 다시 로드되지만 서버가 실행하는 모든 작업은 Celery 작업자에서 오래된 코드를 실행하게 됩니다.

이 게시물에서는 개발 중에 Celery 작업자를 자동으로 다시 로드하는 사용자 지정manage.py runworker 명령을 작성하는 방법을 보여줍니다. 이 명령은 runserver를 모델로 하며 Django의 자동 다시 로드가 내부적으로 어떻게 작동하는지 살펴보겠습니다.

시작하기 전에

이 게시물에서는 Celery가 이미 설치된 Django 앱이 있다고 가정합니다(가이드). 또한 Django의 프로젝트와 애플리케이션의 차이점을 이해하고 있다고 가정합니다.

소스 코드 및 문서에 대한 모든 링크는 게시 당시(2024년 7월) 현재 버전의 Django 및 Celery에 대한 것입니다. 먼 미래에 이 글을 읽고 계시다면 상황이 달라졌을 수도 있습니다.

마지막으로 게시물 예제에서는 메인 프로젝트 디렉토리의 이름이 my_project로 지정됩니다.

해결 방법: 사용자 정의 명령

runworker라는 사용자 정의 Manage.py 명령을 생성하겠습니다. Django는 runserver 명령을 통해 자동 다시 로드를 제공하므로 runserver의 소스 코드를 사용자 정의 명령의 기초로 사용합니다.

프로젝트 애플리케이션 내에서 관리/명령/디렉토리를 만들어 Django에서 명령을 생성할 수 있습니다. 디렉토리가 생성되면 해당 디렉토리 내에 생성하려는 명령 이름이 포함된 Python 파일을 넣을 수 있습니다(문서).

프로젝트에 polls라는 애플리케이션이 있다고 가정하면 polls/management/commands/runworker.py에 파일을 만들고 다음 코드를 추가합니다.

# polls/management/commands/runworker.py

import sys
from datetime import datetime

from celery.signals import worker_init

from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import autoreload

from my_project.celery import app as celery_app


class Command(BaseCommand):
    help = "Starts a Celery worker instance with auto-reloading for development."

    # Validation is called explicitly each time the worker instance is reloaded.
    requires_system_checks = []
    suppressed_base_arguments = {"--verbosity", "--traceback"}

    def add_arguments(self, parser):
        parser.add_argument(
            "--skip-checks",
            action="store_true",
            help="Skip system checks.",
        )
        parser.add_argument(
            "--loglevel",
            choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"),
            type=str.upper,  # Transforms user input to uppercase.
            default="INFO",
        )

    def handle(self, *args, **options):
        autoreload.run_with_reloader(self.run_worker, **options)

    def run_worker(self, **options):
        # If an exception was silenced in ManagementUtility.execute in order
        # to be raised in the child process, raise it now.
        autoreload.raise_last_exception()

        if not options["skip_checks"]:
            self.stdout.write("Performing system checks...\n\n")
            self.check(display_num_errors=True)

        # Need to check migrations here, so can't use the
        # requires_migrations_check attribute.
        self.check_migrations()

        # Print Django info to console when the worker initializes.
        worker_init.connect(self.on_worker_init)

        # Start the Celery worker.
        celery_app.worker_main(
            [
                "--app",
                "my_project",
                "--skip-checks",
                "worker",
                "--loglevel",
                options["loglevel"],
            ]
        )

    def on_worker_init(self, sender, **kwargs):
        quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C"

        now = datetime.now().strftime("%B %d, %Y - %X")
        version = self.get_version()
        print(
            f"{now}\n"
            f"Django version {version}, using settings {settings.SETTINGS_MODULE!r}\n"
            f"Quit the worker instance with {quit_command}.",
            file=self.stdout,
        )
로그인 후 복사

중요: my_project의 모든 인스턴스를 Django 프로젝트 이름으로 바꿔야 합니다.

이 코드를 복사하여 붙여넣고 프로그래밍을 계속하려면 이 게시물의 나머지 부분을 읽지 않고도 여기서 중지해도 됩니다. 이는 Django & Celery 프로젝트를 개발할 때 도움이 될 우아한 솔루션입니다. 그러나 작동 방식에 대해 더 자세히 알고 싶다면 계속 읽어보세요.

작동 방식(선택 사항)

이 코드를 한 줄씩 검토하기보다는 주제별로 가장 흥미로운 부분을 논의하겠습니다. Django 사용자 정의 명령에 아직 익숙하지 않다면 계속하기 전에 문서를 검토하는 것이 좋습니다.

자동 재장전

이 부분이 가장 마법처럼 느껴집니다. 명령의 handler() 메소드 본문 내에는 Django의 내부 autoreload.run_with_reloader()에 대한 호출이 있습니다. 프로젝트에서 Python 파일이 변경될 때마다 실행되는 콜백 함수를 허용합니다. 그게 실제로 어떻게 작동하나요?

autoreload.run_with_reloader() 함수 소스 코드의 단순화된 버전을 살펴보겠습니다. 단순화된 함수는 코드를 다시 작성하고, 인라인하고, 삭제하여 해당 작업에 대한 명확성을 제공합니다.

# NOTE: This has been dramatically pared down for clarity.

def run_with_reloader(callback_func, *args, **kwargs):
    # NOTE: This will evaluate to False the first time it is run.
    is_inside_subprocess = os.getenv("RUN_MAIN") == "true"

    if is_inside_subprocess:
        # The reloader watches for Python file changes.
        reloader = get_reloader()

        django_main_thread = threading.Thread(
            target=callback_func, args=args, kwargs=kwargs
        )
        django_main_thread.daemon = True
        django_main_thread.start()

        # When the code changes, the reloader exits with return code 3.
        reloader.run(django_main_thread)

    else:
        # Returns Python path and the arguments passed to the command.
        # Example output: ['/path/to/python', './manage.py', 'runworker']
        args = get_child_arguments()

        subprocess_env = {**os.environ, "RUN_MAIN": "true"}
        while True:
            # Rerun the manage.py command in a subprocess.
            p = subprocess.run(args, env=subprocess_env, close_fds=False)
            if p.returncode != 3:
                sys.exit(p.returncode)
로그인 후 복사

manage.py runworker가 명령줄에서 실행되면 먼저 run_with_reloader()를 호출하는 handler() 메서드를 호출합니다.

run_with_reloader() 내부에서는 RUN_MAIN이라는 환경 변수의 값이 "true"인지 확인합니다. 함수가 처음 호출될 때 RUN_MAIN에는 값이 없어야 합니다.

RUN_MAIN이 "true"로 설정되지 않으면 run_with_reloader()가 루프에 들어갑니다. 루프 내에서 전달된 Manage.py [command_name]을 다시 실행하는 하위 프로세스를 시작한 다음 해당 하위 프로세스가 종료될 때까지 기다립니다. 하위 프로세스가 반환 코드 3으로 종료되면 루프의 다음 반복은 새 하위 프로세스를 시작하고 기다립니다. 하위 프로세스가 3이 아닌 종료 코드를 반환할 때까지(또는 사용자가 ctrl + c를 사용하여 종료할 때까지) 루프가 실행됩니다. 3이 아닌 반환 코드를 받으면 프로그램이 완전히 종료됩니다.

생성된 하위 프로세스는 Manage.py 명령을 다시 실행하고(이 경우에는 Manage.py runworker) 명령이 다시 run_with_reloader()를 호출합니다. 이번에는 명령이 하위 프로세스에서 실행되기 때문에 RUN_MAIN이 "true"로 설정됩니다.

이제 run_with_reloader()는 자신이 하위 프로세스에 있다는 것을 알았으므로 파일 변경 사항을 감시하는 리로더를 가져오고 제공된 콜백 함수를 스레드에 넣은 다음 변경 사항 감시를 시작하는 리로더에 전달합니다.

리로더가 파일 변경을 감지하면 sys.exit(3)을 실행합니다. 그러면 하위 프로세스가 종료되고 하위 프로세스를 생성한 코드에서 루프의 다음 반복이 트리거됩니다. 그러면 업데이트된 버전의 코드를 사용하는 새로운 하위 프로세스가 시작됩니다.

시스템 점검 및 마이그레이션

기본적으로 Django 명령은 handler() 메서드를 실행하기 전에 시스템 검사를 수행합니다. 그러나 runserver 및 사용자 정의 runworker 명령의 경우 run_with_reloader()에 제공하는 콜백 내부에 들어갈 때까지 이러한 실행을 연기하려고 합니다. 우리의 경우 이것은 run_worker() 메서드입니다. 이를 통해 손상된 시스템 검사를 수정하면서 자동 다시 로드로 명령을 실행할 수 있습니다.

시스템 검사 실행을 연기하기 위해 require_system_checks 속성 값을 빈 목록으로 설정하고 run_worker() 본문에서 self.check()를 호출하여 검사를 수행합니다. runserver와 마찬가지로 사용자 정의 runworker 명령도 모든 마이그레이션이 실행되었는지 확인하고 보류 중인 마이그레이션이 있으면 경고를 표시합니다.

run_worker() 메서드 내에서 Django의 시스템 검사를 이미 수행하고 있으므로 중복 작업을 방지하기 위해 --skip-checks 플래그를 전달하여 Celery에서 시스템 검사를 비활성화합니다.

시스템 검사 및 마이그레이션과 관련된 모든 코드는 runserver 명령 소스 코드에서 직접 가져왔습니다.

celery_app.worker_main()

우리 구현에서는 Celery로 쉘링하는 대신 celery_app.worker_main()을 사용하여 Python에서 직접 Celery 작업자를 시작합니다.

on_worker_init()

이 코드는 작업자가 초기화될 때 실행되어 날짜와 시간, Django 버전, 종료 명령을 표시합니다. runserver 부팅시 표시되는 정보를 모델로 하였습니다.

기타 런서버 상용구

runserver 소스에서 다음 줄도 제거되었습니다.

  • suppressed_base_arguments = {"--verbosity", "--traceback"}
  • autoreload.raise_last_Exception()

로그 수준

우리의 사용자 정의 명령에는 개발자가 코드를 수정하지 않고 CLI에서 설정을 조정하려는 경우를 대비해 구성 가능한 로그 수준이 있습니다.

더 나아가

이 구현을 구축하기 위해 Django & Celery의 소스 코드를 찔러봤고 이를 확장할 수 있는 기회가 많이 있습니다. 더 많은 Celery 작업자 인수를 허용하도록 명령을 구성할 수 있습니다. 또는 David Browne이 이 Gist에서 했던 것처럼 모든 셸 명령을 자동으로 다시 로드하는 사용자 정의 Manage.py 명령을 만들 수 있습니다.

이 내용이 유용하셨다면 좋아요나 댓글을 남겨주세요. 읽어주셔서 감사합니다.

위 내용은 사용자 정의 Django 명령을 사용하여 Celery 작업자를 자동으로 다시 로드의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

원천:dev.to
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿
회사 소개 부인 성명 Sitemap
PHP 중국어 웹사이트:공공복지 온라인 PHP 교육,PHP 학습자의 빠른 성장을 도와주세요!