> 백엔드 개발 > 파이썬 튜토리얼 > Celery 작업의 실행 시간을 어떻게 측정합니까?

Celery 작업의 실행 시간을 어떻게 측정합니까?

Barbara Streisand
풀어 주다: 2025-01-13 22:28:44
원래의
880명이 탐색했습니다.

How do I measure the execution time of Celery tasks?

중복 코드 모음에 Celery 작업 실행 시간 추적이라는 새 멤버가 추가되었습니다.

각 Celery 작업에는 실제로 두 가지 "실행" 시간이 있습니다.

  • 실제 실행 시간: 코드가 실행되는 데 걸리는 시간입니다.
  • "완료 시간": 사용 가능한 작업자 프로세스를 위해 대기열에서 대기하는 데 소요된 시간을 포함합니다.

우리의 궁극적인 목표는 작업이 언제 완료되는지 아는 것이기 때문에 두 가지 모두 중요합니다.

작업을 트리거한 후 작업이 언제 완료되고 언제 결과를 기대할 수 있는지 알아야 합니다. 그것은 프로젝트 견적과 같습니다. 관리자들이 정말로 알고 싶어하는 것은 프로젝트가 언제 완료될 것인지, 일주일 안에 완료될 것인지가 아니라 앞으로 6개월 안에 그것을 할 시간이 아무도 없을 것인지입니다.

셀러리 신호 사용

Celery 신호를 사용하여 작업 시간을 측정할 수 있습니다.

팁 1: Celery 신호의 모든 매개변수는 키워드 매개변수입니다. 즉, 관심 있는 키워드 인수만 나열하고 나머지는 **kwargs에 넣을 수 있습니다. 이것은 훌륭한 디자인입니다! 모든 신호는 이렇게 해야 합니다!

팁 2: 작업 개체의 "headers" 속성에 실행 시작 및 종료 시간을 저장할 수 있습니다.

작업 참여

Celery 작업이 대기열에 들어가면 현재 시간을 기록합니다.

<code class="language-python">from celery import signals
from dateutil.parser import isoparse
from datetime import datetime, timezone

@signals.before_task_publish.connect
def before_task_publish(*, headers: dict, **kwargs):
    raw_eta = headers.get("eta")
    publish_time = isoparse(raw_eta) if raw_eta else datetime.now(tz=timezone.utc)
    headers["__publish_time"] = publish_time.isoformat()</code>
로그인 후 복사

작업 실행 시작

작업자 프로세스가 작업을 수신하면 현재 시간을 기록합니다.

<code class="language-python">from celery import signals
from datetime import datetime, timezone

@signals.task_prerun.connect
def task_prerun(*, task: Task, **kwargs):
    setattr(task.request, "__prerun_time", datetime.now(tz=timezone.utc).isoformat())</code>
로그인 후 복사

작업 실행이 종료되었습니다

작업이 완료되면 실행 시간을 계산하여 StatsD나 기타 모니터링 도구 같은 곳에 저장합니다.

StatsD는 애플리케이션을 모니터링하고 소프트웨어를 계측하여 맞춤형 측정항목을 제공하기 위한 업계 표준 기술 스택입니다.

  • 넷데이터: StatsD 소개 [1]
<code class="language-python">from celery import signals, Task
from dateutil.parser import isoparse
from datetime import datetime, timezone, timedelta

def to_milliseconds(td: timedelta) -> int:
    return int(td.total_seconds() * 1000)

@signals.task_postrun.connect
def task_postrun(*, task: Task, **kwargs):
    now = datetime.now(tz=timezone.utc)
    publish_time = isoparse(getattr(task.request, "__publish_time", ""))
    prerun_time = isoparse(getattr(task.request, "__prerun_time", ""))

    exec_time = now - prerun_time if prerun_time else timedelta(0)
    waiting_time = prerun_time - publish_time if publish_time and prerun_time else timedelta(0)
    waiting_and_exec_time = now - publish_time if publish_time else timedelta(0)

    stats = {
        "exec_time_ms": to_milliseconds(exec_time),
        "waiting_time_ms": to_milliseconds(waiting_time),
        "waiting_and_exec_time_ms": to_milliseconds(waiting_and_exec_time),
    }
    # TODO: 将统计数据发送到 StatsD 或其他监控工具
    statsd.timing(f"celery.task.exec_time_ms", stats["exec_time_ms"], tags=[f"task:{task.name}"])
    # ... 发送其他统计数据 ...</code>
로그인 후 복사

추가 기능: 긴 실행 시간 경고 설정

위 함수에 하드코딩된 임계값을 추가할 수 있습니다.

<code class="language-python">if exec_time > timedelta(hours=1):
    logger.error(f"任务 {task.name} 执行时间过长: {exec_time}。请检查!")</code>
로그인 후 복사

또는 작업 정의 또는 코드로 표현할 수 있는 모든 항목을 기반으로 다중 수준 임계값 또는 임계값을 설정할 수 있습니다.

위 내용은 Celery 작업의 실행 시간을 어떻게 측정합니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

원천:php.cn
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
저자별 최신 기사
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿