Celery 之前有一個 --autoreload 標誌,現已刪除。然而,Django 在其manage.py runserver 指令中內建了自動重新載入功能。 Celery 工作執行緒中缺乏自動重新加載會造成令人困惑的開發體驗:更新 Python 程式碼會導致 Django 伺服器使用當前程式碼重新加載,但伺服器觸發的任何任務都將在 Celery 工作執行緒中執行過時的程式碼。
這篇文章將向您展示如何建立一個自訂的 manage.py runworker 命令,該命令在開發過程中自動重新載入 Celery 工作執行緒。這個指令將模仿 runserver,我們將看看 Django 的自動重新載入是如何在幕後工作的。
這篇文章假設您有一個已經安裝了 Celery 的 Django 應用程式(指南)。它還假設您了解 Django 中的項目和應用程式之間的差異。
所有原始碼和文件連結均適用於發佈時(2024 年 7 月)目前版本的 Django 和 Celery。如果您在遙遠的將來閱讀本文,事情可能會改變。
最後,主專案目錄將在貼文的範例中命名為 my_project。
我們將建立一個名為 runworker 的自訂管理.py 指令。由於 Django 透過其 runsever 命令提供自動重新加載,因此我們將使用 runserver 的原始程式碼作為自訂命令的基礎。
您可以透過在專案的任何應用程式中建立 management/commands/ 目錄來在 Django 中建立命令。建立目錄後,您可以在該目錄 (docs) 中放置一個帶有您要建立的命令名稱的 Python 檔案。
假設您的專案有一個名為 polls 的應用程序,我們將在 polls/management/commands/runworker.py 中建立一個檔案並添加以下程式碼:
# polls/management/commands/runworker.py import sys from datetime import datetime from celery.signals import worker_init from django.conf import settings from django.core.management.base import BaseCommand from django.utils import autoreload from my_project.celery import app as celery_app class Command(BaseCommand): help = "Starts a Celery worker instance with auto-reloading for development." # Validation is called explicitly each time the worker instance is reloaded. requires_system_checks = [] suppressed_base_arguments = {"--verbosity", "--traceback"} def add_arguments(self, parser): parser.add_argument( "--skip-checks", action="store_true", help="Skip system checks.", ) parser.add_argument( "--loglevel", choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"), type=str.upper, # Transforms user input to uppercase. default="INFO", ) def handle(self, *args, **options): autoreload.run_with_reloader(self.run_worker, **options) def run_worker(self, **options): # If an exception was silenced in ManagementUtility.execute in order # to be raised in the child process, raise it now. autoreload.raise_last_exception() if not options["skip_checks"]: self.stdout.write("Performing system checks...\n\n") self.check(display_num_errors=True) # Need to check migrations here, so can't use the # requires_migrations_check attribute. self.check_migrations() # Print Django info to console when the worker initializes. worker_init.connect(self.on_worker_init) # Start the Celery worker. celery_app.worker_main( [ "--app", "my_project", "--skip-checks", "worker", "--loglevel", options["loglevel"], ] ) def on_worker_init(self, sender, **kwargs): quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C" now = datetime.now().strftime("%B %d, %Y - %X") version = self.get_version() print( f"{now}\n" f"Django version {version}, using settings {settings.SETTINGS_MODULE!r}\n" f"Quit the worker instance with {quit_command}.", file=self.stdout, )
重要提示: 請務必將 my_project 的所有實例替換為您的 Django 專案的名稱。
如果您想複製並貼上此程式碼並繼續編程,您可以安全地停在此處,而無需閱讀本文的其餘部分。這是一個優雅的解決方案,將在您開發 Django 和 Celery 專案時為您提供良好的服務。但是,如果您想了解更多有關其工作原理的信息,請繼續閱讀。
我不會逐行查看此程式碼,而是按主題討論最有趣的部分。如果您還不熟悉 Django 自訂命令,您可能需要在繼續之前查看文件。
這部分感覺最神奇。在指令的handle()方法體內,呼叫了Django的內部autoreload.run_with_reloader()。它接受一個回調函數,每次專案中的 Python 檔案發生變更時都會執行該函數。 實際上是如何運作的?
讓我們來看看 autoreload.run_with_reloader() 函數原始碼的簡化版本。簡化的函數會重寫、內聯和刪除程式碼,使其操作更加清晰。
# NOTE: This has been dramatically pared down for clarity. def run_with_reloader(callback_func, *args, **kwargs): # NOTE: This will evaluate to False the first time it is run. is_inside_subprocess = os.getenv("RUN_MAIN") == "true" if is_inside_subprocess: # The reloader watches for Python file changes. reloader = get_reloader() django_main_thread = threading.Thread( target=callback_func, args=args, kwargs=kwargs ) django_main_thread.daemon = True django_main_thread.start() # When the code changes, the reloader exits with return code 3. reloader.run(django_main_thread) else: # Returns Python path and the arguments passed to the command. # Example output: ['/path/to/python', './manage.py', 'runworker'] args = get_child_arguments() subprocess_env = {**os.environ, "RUN_MAIN": "true"} while True: # Rerun the manage.py command in a subprocess. p = subprocess.run(args, env=subprocess_env, close_fds=False) if p.returncode != 3: sys.exit(p.returncode)
當manage.py runworker在命令列中執行時,它會先呼叫handle()方法,該方法會呼叫run_with_reloader()。
在 run_with_reloader() 內部,它將檢查名為 RUN_MAIN 的環境變數是否具有「true」值。當函數第一次被呼叫時,RUN_MAIN 應該沒有值。
當 RUN_MAIN 未設定為「true」時,run_with_reloader() 將進入循環。在迴圈內,它將啟動一個子進程,重新執行傳入的manage.py [command_name],然後等待該子進程退出。如果子進程退出並傳回程式碼 3,則循環的下一次迭代將啟動一個新的子進程並等待。這個循環將一直運行,直到子進程返回不為 3 的退出代碼(或直到使用者使用 ctrl + c 退出)。一旦得到非3的返回碼,就會完全退出程式。
產生的子程序再次執行manage.py指令(在我們的例子中是manage.py runworker),並且該指令將再次呼叫run_with_reloader()。這次,RUN_MAIN 將被設定為“true”,因為該命令在子進程中運行。
現在 run_with_reloader() 知道它位於子進程中,它將獲得一個監視檔案變更的重新載入器,將提供的回呼函數放入執行緒中,並將其傳遞給開始監視變更的重新載入器。
當重新載入器偵測到檔案變更時,它會執行 sys.exit(3)。這將退出子流程,從而觸發生成子流程的程式碼的下一次循環迭代。反過來,會啟動一個使用更新版本程式碼的新子流程。
預設情況下,Django 指令在執行其handle() 方法之前執行系統檢查。但是,對於 runserver 和我們的自訂 runworker 命令,我們希望延遲執行這些命令,直到進入我們提供給 run_with_reloader() 的回呼中。在我們的例子中,這是我們的 run_worker() 方法。這使我們能夠運行自動重新載入的命令,同時修復損壞的系統檢查。
為了延遲執行系統檢查,需要將requires_system_checks屬性的值設為空列表,並透過在run_worker()主體中呼叫self.check()來執行檢查。與 runserver 一樣,我們的自訂 runworker 命令也會檢查是否所有遷移都已執行,如果有待處理的遷移,它會顯示警告。
因為我們已經在 run_worker() 方法中執行 Django 的系統檢查,所以我們透過向 Celery 傳遞 --skip-checks 標誌來停用系統檢查,以防止重複工作。
所有與系統檢查和遷移相關的程式碼都是直接從 runserver 命令原始碼中提取的。
我們的實作使用 celery_app.worker_main() 直接從 Python 啟動 Celery 工作程序,而不是向 Celery 發動攻擊。
此程式碼在工作進程初始化時執行,顯示日期和時間、Django 版本以及退出命令。它是根據 runserver 啟動時顯示的資訊建模的。
以下行也從 runserver 原始碼中刪除:
我們的自訂命令具有可配置的日誌級別,以防開發人員希望在不修改程式碼的情況下從 CLI 調整設定。
我研究了 Django 和 Celery 的原始碼來建立這個實現,並且有很多擴展它的機會。您可以配置該命令以接受更多 Celery 的工作參數。或者,您可以建立一個自訂的 manage.py 命令,它會自動重新載入任何 shell 命令,就像 David Browne 在本要點中所做的那樣。
如果您覺得本文有用,請隨時留下按讚或留言。感謝您的閱讀。
以上是使用自訂 Django 命令自動重新載入 Celery 工作線程的詳細內容。更多資訊請關注PHP中文網其他相關文章!