Celery には以前は --autoreload フラグがありましたが、現在は削除されています。ただし、Django には、manage.py runserver コマンドに自動リロード機能が組み込まれています。 Celery ワーカーに自動リロードがないため、開発エクスペリエンスが混乱します。Python コードを更新すると、Django サーバーは現在のコードでリロードされますが、サーバーが起動するタスクはすべて、Celery ワーカーで古いコードを実行します。
この投稿では、開発中に Celery ワーカーを自動的にリロードするカスタム manage.py runworker コマンドを構築する方法を説明します。このコマンドは runserver をモデルにして、Django の自動リロードが内部でどのように機能するかを見ていきます。
この投稿は、Celery がすでにインストールされている Django アプリがあることを前提としています (ガイド)。また、Django のプロジェクトとアプリケーションの違いを理解していることも前提としています。
ソース コードおよびドキュメントへのすべてのリンクは、発行時 (2024 年 7 月) の現在のバージョンの Django および Celery のものになります。あなたが遠い将来にこれを読んでいるなら、状況は変わっているかもしれません。
最後に、投稿の例では、メイン プロジェクト ディレクトリの名前は my_project になります。
runworker という名前のカスタム manage.py コマンドを作成します。 Django は、runserver コマンドを介して自動リロードを提供するため、runserver のソース コードをカスタム コマンドの基礎として使用します。
プロジェクトのアプリケーション内に manage/commands/ ディレクトリを作成することで、Django でコマンドを作成できます。ディレクトリが作成されたら、そのディレクトリ内に作成したいコマンドの名前を含む Python ファイルを置くことができます (docs)。
プロジェクトにpollsという名前のアプリケーションがあると仮定して、polls/management/commands/runworker.pyにファイルを作成し、次のコードを追加します。
# polls/management/commands/runworker.py import sys from datetime import datetime from celery.signals import worker_init from django.conf import settings from django.core.management.base import BaseCommand from django.utils import autoreload from my_project.celery import app as celery_app class Command(BaseCommand): help = "Starts a Celery worker instance with auto-reloading for development." # Validation is called explicitly each time the worker instance is reloaded. requires_system_checks = [] suppressed_base_arguments = {"--verbosity", "--traceback"} def add_arguments(self, parser): parser.add_argument( "--skip-checks", action="store_true", help="Skip system checks.", ) parser.add_argument( "--loglevel", choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"), type=str.upper, # Transforms user input to uppercase. default="INFO", ) def handle(self, *args, **options): autoreload.run_with_reloader(self.run_worker, **options) def run_worker(self, **options): # If an exception was silenced in ManagementUtility.execute in order # to be raised in the child process, raise it now. autoreload.raise_last_exception() if not options["skip_checks"]: self.stdout.write("Performing system checks...\n\n") self.check(display_num_errors=True) # Need to check migrations here, so can't use the # requires_migrations_check attribute. self.check_migrations() # Print Django info to console when the worker initializes. worker_init.connect(self.on_worker_init) # Start the Celery worker. celery_app.worker_main( [ "--app", "my_project", "--skip-checks", "worker", "--loglevel", options["loglevel"], ] ) def on_worker_init(self, sender, **kwargs): quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C" now = datetime.now().strftime("%B %d, %Y - %X") version = self.get_version() print( f"{now}\n" f"Django version {version}, using settings {settings.SETTINGS_MODULE!r}\n" f"Quit the worker instance with {quit_command}.", file=self.stdout, )
重要: my_project のすべてのインスタンスを必ず Django プロジェクトの名前に置き換えてください。
このコードをコピーして貼り付けてプログラミングを続行したい場合は、この投稿の残りを読まなくても、ここで安全に終了できます。これは、Django & Celery プロジェクトを開発する際に役立つ洗練されたソリューションです。ただし、その仕組みについてさらに詳しく知りたい場合は、読み続けてください。
このコードを 1 行ずつレビューするのではなく、トピックごとに最も興味深い部分について説明します。 Django カスタム コマンドにまだ慣れていない場合は、続行する前にドキュメントを確認してください。
この部分が最も魔法のように感じられます。コマンドの handle() メソッドの本体内に、Django の内部 autoreload.run_with_reloader() への呼び出しがあります。プロジェクト内で Python ファイルが変更されるたびに実行されるコールバック関数を受け入れます。 実際はどのように機能しますか?
autoreload.run_with_reloader() 関数のソース コードの簡略化されたバージョンを見てみましょう。簡略化された関数は、コードの書き換え、インライン化、削除を行い、その動作を明確にします。
# NOTE: This has been dramatically pared down for clarity. def run_with_reloader(callback_func, *args, **kwargs): # NOTE: This will evaluate to False the first time it is run. is_inside_subprocess = os.getenv("RUN_MAIN") == "true" if is_inside_subprocess: # The reloader watches for Python file changes. reloader = get_reloader() django_main_thread = threading.Thread( target=callback_func, args=args, kwargs=kwargs ) django_main_thread.daemon = True django_main_thread.start() # When the code changes, the reloader exits with return code 3. reloader.run(django_main_thread) else: # Returns Python path and the arguments passed to the command. # Example output: ['/path/to/python', './manage.py', 'runworker'] args = get_child_arguments() subprocess_env = {**os.environ, "RUN_MAIN": "true"} while True: # Rerun the manage.py command in a subprocess. p = subprocess.run(args, env=subprocess_env, close_fds=False) if p.returncode != 3: sys.exit(p.returncode)
manage.py runworker がコマンドラインで実行されると、最初に handle() メソッドが呼び出され、その後 run_with_reloader() が呼び出されます。
run_with_reloader() 内で、RUN_MAIN という環境変数の値が「true」かどうかを確認します。関数が最初に呼び出されるとき、RUN_MAIN には値がありません。
RUN_MAIN が "true" に設定されていない場合、run_with_reloader() はループに入ります。ループ内で、渡された manage.py [command_name] を再実行するサブプロセスを開始し、そのサブプロセスが終了するのを待ちます。サブプロセスが戻りコード 3 で終了した場合、ループの次の反復で新しいサブプロセスが開始され、待機します。このループは、サブプロセスが 3 以外の終了コードを返すまで (またはユーザーが ctrl + c で終了するまで) 実行されます。 3 以外のリターン コードを取得すると、プログラムは完全に終了します。
生成されたサブプロセスは、manage.py コマンド (この場合は manage.py runworker) を再度実行し、コマンドは再度 run_with_reloader() を呼び出します。今回は、コマンドがサブプロセスで実行されているため、RUN_MAIN は「true」に設定されます。
run_with_reloader() は自分がサブプロセス内にあることを認識したので、ファイルの変更を監視するリローダーを取得し、提供されたコールバック関数をスレッドに配置して、変更の監視を開始するリローダーに渡します。
リローダーはファイルの変更を検出すると、sys.exit(3) を実行します。これによりサブプロセスが終了し、サブプロセスを生成したコードからループの次の反復がトリガーされます。次に、更新されたバージョンのコードを使用する新しいサブプロセスが起動されます。
デフォルトでは、Django コマンドは handle() メソッドを実行する前にシステム チェックを実行します。ただし、runserver とカスタム runworker コマンドの場合は、run_with_reloader() に提供するコールバック内に入るまで、これらの実行を延期する必要があります。私たちの場合、これは run_worker() メソッドです。これにより、壊れたシステム チェックを修正しながら、自動リロードでコマンドを実行できます。
システム チェックの実行を延期するには、requires_system_checks 属性の値を空のリストに設定し、run_worker() の本体で self.check() を呼び出してチェックを実行します。 runserver と同様に、カスタム runworker コマンドもすべての移行が実行されたかどうかを確認し、保留中の移行がある場合は警告を表示します。
run_worker() メソッド内で Django のシステム チェックをすでに実行しているため、重複した作業を防ぐために --skip-checks フラグを渡して Celery のシステム チェックを無効にします。
システム チェックと移行に関連するコードはすべて、runserver コマンドのソース コードから直接取得されました。
私たちの実装では、Celery にシェルアウトするのではなく、celery_app.worker_main() を使用して Python から直接 Celery ワーカーを起動します。
このコードはワーカーが初期化されるときに実行され、日付と時刻、Django のバージョン、終了するコマンドが表示されます。これは、runserver の起動時に表示される情報をモデルにしています。
次の行も runserver ソースから引用されました:
開発者がコードを変更せずに CLI から設定を調整したい場合に備えて、カスタム コマンドには構成可能なログ レベルがあります。
私はこの実装を構築するために Django と Celery のソース コードを徹底的に調べましたが、拡張する機会はたくさんあります。 Celery のワーカー引数をさらに受け入れるようにコマンドを構成できます。あるいは、David Browne がこの Gist で行ったように、任意の シェル コマンドを自動的にリロードするカスタム manage.py コマンドを作成することもできます。
これが役に立ったと思われた場合は、お気軽に「いいね!」またはコメントを残してください。読んでいただきありがとうございます。
以上がカスタム Django コマンドを使用して Celery ワーカーを自動的にリロードするの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。