Home > Backend Development > Python Tutorial > How do I measure the execution time of Celery tasks?

How do I measure the execution time of Celery tasks?

Barbara Streisand
Release: 2025-01-13 22:28:44
Original
880 people have browsed it

How do I measure the execution time of Celery tasks?

A new member has been added to the collection of duplicate codes: tracking the execution time of Celery tasks.

Each Celery task actually has two different "execution" times:

  • Actual execution time: The time it takes for the code to run.
  • "Completion time": Includes time spent waiting in the queue for an available worker process.

Both are important because our ultimate goal is to know when the task is complete.

After triggering a task, we need to know when the task is completed and when we can expect the results. It's like project estimating. What managers really want to know is when the project will be completed, not that it will be completed in a week but no one will have time to do it in the next six months.

Use Celery Signals

We can use Celery signals to time tasks.

Tip 1: All parameters of Celery signals are keyword parameters. This means we can just list the keyword arguments we're interested in and pack the rest into **kwargs. This is a great design! All signals should be done this way!

Tip 2: We can store the execution start and end time in the "headers" property of the task object.

Task joining

When the Celery task enters the queue, record the current time:

<code class="language-python">from celery import signals
from dateutil.parser import isoparse
from datetime import datetime, timezone

@signals.before_task_publish.connect
def before_task_publish(*, headers: dict, **kwargs):
    raw_eta = headers.get("eta")
    publish_time = isoparse(raw_eta) if raw_eta else datetime.now(tz=timezone.utc)
    headers["__publish_time"] = publish_time.isoformat()</code>
Copy after login

Task execution begins

When the worker process receives the task, record the current time:

<code class="language-python">from celery import signals
from datetime import datetime, timezone

@signals.task_prerun.connect
def task_prerun(*, task: Task, **kwargs):
    setattr(task.request, "__prerun_time", datetime.now(tz=timezone.utc).isoformat())</code>
Copy after login

Task execution ended

When the task is completed, calculate the execution time and store it somewhere, such as StatsD or other monitoring tool.

StatsD is the industry standard technology stack for monitoring applications and instrumenting any software to provide custom metrics.

  • Netdata: StatsD Introduction [1]
<code class="language-python">from celery import signals, Task
from dateutil.parser import isoparse
from datetime import datetime, timezone, timedelta

def to_milliseconds(td: timedelta) -> int:
    return int(td.total_seconds() * 1000)

@signals.task_postrun.connect
def task_postrun(*, task: Task, **kwargs):
    now = datetime.now(tz=timezone.utc)
    publish_time = isoparse(getattr(task.request, "__publish_time", ""))
    prerun_time = isoparse(getattr(task.request, "__prerun_time", ""))

    exec_time = now - prerun_time if prerun_time else timedelta(0)
    waiting_time = prerun_time - publish_time if publish_time and prerun_time else timedelta(0)
    waiting_and_exec_time = now - publish_time if publish_time else timedelta(0)

    stats = {
        "exec_time_ms": to_milliseconds(exec_time),
        "waiting_time_ms": to_milliseconds(waiting_time),
        "waiting_and_exec_time_ms": to_milliseconds(waiting_and_exec_time),
    }
    # TODO: 将统计数据发送到 StatsD 或其他监控工具
    statsd.timing(f"celery.task.exec_time_ms", stats["exec_time_ms"], tags=[f"task:{task.name}"])
    # ... 发送其他统计数据 ...</code>
Copy after login

Extra feature: Set long execution time warning

It is possible to add a hardcoded threshold in the above function:

<code class="language-python">if exec_time > timedelta(hours=1):
    logger.error(f"任务 {task.name} 执行时间过长: {exec_time}。请检查!")</code>
Copy after login

Alternatively, one can set multi-level thresholds or thresholds based on the task definition, or whatever can be expressed in code.

The above is the detailed content of How do I measure the execution time of Celery tasks?. For more information, please follow other related articles on the PHP Chinese website!

source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Latest Articles by Author
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template