本文在上一篇有關公平處理的文章的基礎上探討了 Celery 中的任務優先順序。任務優先級提供了一種透過根據自訂標準為任務分配不同優先順序來增強後台處理的公平性和效率的方法。
任務級優先順序提供對任務執行的細粒度控制,無需複雜的實作。透過將所有任務提交到具有指定優先值的單一佇列,工作人員可以根據任務的緊急程度處理任務。這確保了公平處理,無論提交時間如何。
例如,如果一個租戶提交了 100 個任務,而另一個租戶不久後提交了 5 個任務,則任務級別優先級會阻止第二個租戶等待所有 100 個任務完成。
這種方法根據租用戶的任務計數動態分配優先順序。 每個租戶的第一個任務以高優先級開始,但每有 10 個並發任務,優先順序就會降低。這可以確保任務較少的租戶不會遇到不必要的延誤。
首先,安裝 Celery 和 Redis:
pip install celery redis
配置 Celery 使用 Redis 作為代理並啟用基於優先順序的任務處理:
from celery import Celery app = Celery( "tasks", broker="redis://localhost:6379/0", broker_connection_retry_on_startup=True, ) app.conf.broker_transport_options = { "priority_steps": list(range(10)), "sep": ":", "queue_order_strategy": "priority", }
定義一個方法來計算動態優先權,使用Redis來快取每個租用戶的任務計數:
import redis redis_client = redis.StrictRedis(host="localhost", port=6379, db=1) def calculate_priority(tenant_id): """ Calculate task priority based on the number of tasks for the tenant. """ key = f"tenant:{tenant_id}:task_count" task_count = int(redis_client.get(key) or 0) return min(10, task_count // 10)
建立自訂任務類別以在成功完成後減少任務計數:
from celery import Task class TenantAwareTask(Task): def on_success(self, retval, task_id, args, kwargs): tenant_id = kwargs.get("tenant_id") if tenant_id: key = f"tenant:{tenant_id}:task_count" redis_client.decr(key, 1) return super().on_success(retval, task_id, args, kwargs) @app.task(name="tasks.send_email", base=TenantAwareTask) def send_email(tenant_id, task_data): """ Simulate sending an email. """ sleep(1) key = f"tenant:{tenant_id}:task_count" task_count = int(redis_client.get(key) or 0) logger.info("Tenant %s tasks: %s", tenant_id, task_count)
為不同租用戶觸發任務,確保tenant_id包含在任務的關鍵字參數中:
if __name__ == "__main__": tenant_id = 1 for _ in range(100): priority = calculate_priority(tenant_id) key = f"tenant:{tenant_id}:task_count" redis_client.incr(key, 1) send_email.apply_async( kwargs={"tenant_id": tenant_id, "task_data": {}}, priority=priority ) tenant_id = 2 for _ in range(10): priority = calculate_priority(tenant_id) key = f"tenant:{tenant_id}:task_count" redis_client.incr(key, 1) send_email.apply_async( kwargs={"tenant_id": tenant_id, "task_data": {}}, priority=priority )
您可以在此處查看完整程式碼。
啟動 Celery Worker 並觸發任務:
# Run the worker celery -A tasks worker --loglevel=info # Trigger the tasks python tasks.py
此設定示範了 Celery 的優先權佇列如何與 Redis 結合,透過根據租戶活動動態調整優先權來確保公平的任務處理。讓我們來看看工作人員的簡化輸出:
Celery 和 Redis 的任務優先順序為確保多租戶系統中的公平處理提供了強大的解決方案。透過動態分配優先權並利用單一佇列,您可以在滿足業務需求的同時保持簡單性。
實現任務優先級的方法有很多,例如使用 RabbitMQ 效率更高,因為它的核心支援優先級,但由於我們也使用 Redis 進行任務計數,因此它簡化了我們的整體架構。
希望您覺得這篇文章很有用,並請參考下一篇!
以上是確保芹菜的公平加工 - 第二部分的詳細內容。更多資訊請關注PHP中文網其他相關文章!