芹菜學起來可能令人望而生畏。雖然它的文檔很全面,但它傾向於跳過基礎知識。
這篇文章將定義 Celery 中的四個主要概念,討論 Celery 和 Kombu 之間的關係,並使用一些程式碼範例來說明 Celery 在實際應用程式中如何有用。這些範例將使用 Django Web 框架及其 @shared_task 裝飾器,但這些概念也適用於 Flask、FastAPI 等。
你很難在目前的Celery 文件中找到一個地方來清楚地說明它所認為的經紀人或後端,但是透過足夠的挖掘,你可以找到和推斷定義。
以下是您應該了解的概念開始使用 Celery 之前。
任務是Celery將非同步執行的一些工作(在這種情況下,這是「不立即」的一個奇特詞)。在 Web 應用程式中,一項任務可能是在使用者提交表單後發送電子郵件。發送電子郵件可能是一個需要多秒鐘的操作,並且強制使用者在重定向之前等待電子郵件發送可能會讓應用程式感覺很慢。
任務是使用 Celery 中的裝飾器定義的。下面我們使用 @shared_task 裝飾器將 send_thank_you_email() 轉換為 Celery 任務,可以在 Submit_feedback() 表單提交處理程序中使用。
from config.celery import shared_task from django.core.mail import send_mail from django.shortcuts import render, redirect from feedback.forms import FeedbackForm @shared_task def send_thank_you_email(email_address): send_mail( "Thank you for your feedback!", "We appreciate your input.", "noreply@example.com", [email_address], ) def submit_feedback(request): if request.method == "POST": form = FeedbackForm(request.POST) if form.is_valid(): form.save() # Push the task to the broker using the delay() method. send_thank_you_email.delay(form.cleaned_data["email"]) return redirect("/thank-you/") else: form = FeedbackForm() return render(request, "feedback.html", {"form": form})
當在 Celery 中使用裝飾器定義任務時,它會為任務新增一個delay() 方法。您可以看到在成功儲存表單後,send_thank_you_email 任務呼叫了上例中的delay() 方法。當delay()被呼叫時,它會將send_thank_you_email任務及其資料發送到儲存它的broker,稍後將由worker執行,此時用戶將已透過電子郵件發送此時用戶。
如果您在儲存表單後需要發送額外的電子郵件,那麼將工作推送到 Celery 的好處就變得更加明顯。例如,您可能想向客戶支援團隊發送電子郵件,告知他們收到了新的回饋。對於 Celery,這幾乎不會增加回應時間。
Celery 任務還允許額外的進階配置。如果電子郵件傳送失敗,您可以對任務進行編碼以自動重試並配置 max_retries、retry_backoff、retry_jitter 等設定。
Celery 增強提案的術語表對 訊息代理有以下說法:
企業整合模式將訊息代理定義為一個架構建構塊,它可以從多個目的地接收訊息,確定正確的目的地並將訊息路由到正確的通道。
出於我們使用 Celery 的目的,我們將考慮 代理 儲存建立的任務的「訊息傳輸」。經紀人其實並不執行任務:那是工人的工作。相反,代理是計劃任務在計劃任務時存儲到的地方,以及在工作人員最終執行任務時從拉取的地方。代理是 Celery 工作的必需的組件,並且 Celery 將只連接到一個代理。
Celery 的後端和代理頁面列出了一些其支援的代理,並且還有其他未列出的實驗性代理(例如 SQLAlchemy)。這些代理程式(或「訊息傳輸」)由 Celery 維護的 Python 訊息傳輸庫(稱為 Kombu)管理。當尋找有關配置代理的資訊時,查閱 Kombu 的文檔而不是 Celery 的文檔有時會很有幫助。
有些代理具有任務扇出和優先權等高級功能,而其他代理則作為簡單佇列運行。
worker 是 Celery 的一個實例,它從代理程式中提取任務並執行 Python 應用程式中定義的任務函數。 Celery 能夠在其工作執行緒中執行 Python 程式碼,因為 Celery 本身是用 Python 編寫的。
許多worker可以同時執行來執行任務。當您執行 celery worker 命令時,預設情況下它會為電腦的每個核心啟動一個工作執行緒。如果你的電腦有 16 個核心,執行 celery Worker 就會啟動 16 個 Worker。
如果沒有工作執行緒在執行,訊息(任務)將在代理程式中累積,直到工作執行緒可以執行它們。
Celery 使用者指南中的任務頁面有以下關於後端的說明:
If you want to keep track of tasks or need the return values, then Celery must store or send the states somewhere so that they can be retrieved later. There are several built-in result backends to choose from: SQLAlchemy/Django ORM, Memcached, RabbitMQ/QPid (rpc), and Redis – or you can define your own.
TLDR: a backend tracks the outcomes and returned results of async tasks. What does that actually mean, and when could it be useful?
Imagine you are building an accounting app in Django that can generate an annual report. The report could take minutes to generate.
To give your users a more responsive experience, you use an AJAX request to kick off a report generation task. That request returns an ID of the task, which it can use to poll the server every few seconds to see if the report is generated. Once the task is complete, it will return the ID of the report, which the client can use to display a link to the report via JavaScript.
We can implement this with Celery and Django using the following code:
from celery import shared_task from django.http import JsonResponse from django.views.decorators.http import require_http_methods from accounting.models import Asset from accounting.reports import AnnualReportGenerator @shared_task def generate_report_task(year): # This could take minutes... report = AnnualReportGenerator().generate(year) asset = Asset.objects.create( name=f"{year} annual report", url=report.url, ) return asset.id @require_http_methods(["POST"]) def generate_annual_report_view(request): year = request.POST.get("year") task = generate_report_task.delay(year) return JsonResponse({"taskId": task.id}) def get_annual_report_generation_status_view(request, task_id): task = generate_report_task.AsyncResult(task_id) # The status is typically "PENDING", "SUCCESS", or "FAILURE" status = task.status return JsonResponse({"status": status, "assetId": task.result})
In this example, the asset ID returned by generate_report_task() is stored in a backend. The backend stores the outcome and returned result. The backend does not store the status of yet-to-be-processed tasks: these will only be added once there has been an outcome. A task that returns "PENDING" has a completely unknown status: an associated task might not even exist. Tasks will typically return "SUCCESS" or "FAILURE", but you can see all statuses in the Celery status docs.
Having a backend is not required for Celery to run tasks. However, it is required if you ever need to check the outcome of a task or return a task's result. If you try to check a task's status when Celery does not have a backend configured, an exception will be raised.
I hope this post helps you understand the individual pieces of Celery and why you might consider using it. While the official documentation is challenging to grok, learning Celery deeply can unlock new possibilities within your Python applications.
以上是了解 Celery 中的任務、代理人、工作人員和後端的詳細內容。更多資訊請關注PHP中文網其他相關文章!