首页 > 后端开发 > Python教程 > 如何测量 Celery 任务的执行时间?

如何测量 Celery 任务的执行时间?

Barbara Streisand
发布: 2025-01-13 22:28:44
原创
880 人浏览过

How do I measure the execution time of Celery tasks?

重复代码的集合中又添新成员:追踪 Celery 任务的执行时间。

每个 Celery 任务实际上有两个不同的“执行”时间:

  • 实际执行时间: 代码运行的时间。
  • “完成时间”: 包括在队列中等待可用工作进程的时间。

两者都很重要,因为我们的最终目标是了解任务何时完成

触发任务后,我们需要知道任务何时完成以及何时可以预期结果。这就像项目估算一样。管理者真正想知道的是项目何时完成,而不是它在一周内就能完成,但没有人有空在接下来的六个月内去做。

使用 Celery 信号

我们可以使用 Celery 信号来计时任务。

提示 1: Celery 信号的所有参数都是关键字参数。这意味着我们可以只列出我们感兴趣的关键字参数,并将其余参数打包到 **kwargs 中。这是个非常棒的设计!所有信号都应该采用这种方式!

提示 2: 我们可以将执行开始和结束时间存储在任务对象的“headers”属性中。

任务入队

当 Celery 任务进入队列时,记录当前时间:

<code class="language-python">from celery import signals
from dateutil.parser import isoparse
from datetime import datetime, timezone

@signals.before_task_publish.connect
def before_task_publish(*, headers: dict, **kwargs):
    raw_eta = headers.get("eta")
    publish_time = isoparse(raw_eta) if raw_eta else datetime.now(tz=timezone.utc)
    headers["__publish_time"] = publish_time.isoformat()</code>
登录后复制

任务开始执行

当工作进程接收到任务时,记录当前时间:

<code class="language-python">from celery import signals
from datetime import datetime, timezone

@signals.task_prerun.connect
def task_prerun(*, task: Task, **kwargs):
    setattr(task.request, "__prerun_time", datetime.now(tz=timezone.utc).isoformat())</code>
登录后复制

任务执行结束

任务完成后,计算执行时间并将其存储到某个地方,例如 StatsD 或其他监控工具。

StatsD 是用于监控应用程序和检测任何软件以提供自定义指标的行业标准技术栈。

  • Netdata: StatsD 简介 [1]
<code class="language-python">from celery import signals, Task
from dateutil.parser import isoparse
from datetime import datetime, timezone, timedelta

def to_milliseconds(td: timedelta) -> int:
    return int(td.total_seconds() * 1000)

@signals.task_postrun.connect
def task_postrun(*, task: Task, **kwargs):
    now = datetime.now(tz=timezone.utc)
    publish_time = isoparse(getattr(task.request, "__publish_time", ""))
    prerun_time = isoparse(getattr(task.request, "__prerun_time", ""))

    exec_time = now - prerun_time if prerun_time else timedelta(0)
    waiting_time = prerun_time - publish_time if publish_time and prerun_time else timedelta(0)
    waiting_and_exec_time = now - publish_time if publish_time else timedelta(0)

    stats = {
        "exec_time_ms": to_milliseconds(exec_time),
        "waiting_time_ms": to_milliseconds(waiting_time),
        "waiting_and_exec_time_ms": to_milliseconds(waiting_and_exec_time),
    }
    # TODO: 将统计数据发送到 StatsD 或其他监控工具
    statsd.timing(f"celery.task.exec_time_ms", stats["exec_time_ms"], tags=[f"task:{task.name}"])
    # ... 发送其他统计数据 ...</code>
登录后复制

额外功能:设置执行时间过长警告

可以在上述函数中添加硬编码阈值:

<code class="language-python">if exec_time > timedelta(hours=1):
    logger.error(f"任务 {task.name} 执行时间过长: {exec_time}。请检查!")</code>
登录后复制

或者,可以根据任务定义设置多层阈值或阈值,或者任何可以在代码中表达的内容。

以上是如何测量 Celery 任务的执行时间?的详细内容。更多信息请关注PHP中文网其他相关文章!

来源:php.cn
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
作者最新文章
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板