중복 코드 모음에 Celery 작업 실행 시간 추적이라는 새 멤버가 추가되었습니다.
각 Celery 작업에는 실제로 두 가지 "실행" 시간이 있습니다.
우리의 궁극적인 목표는 작업이 언제 완료되는지 아는 것이기 때문에 두 가지 모두 중요합니다.
작업을 트리거한 후 작업이 언제 완료되고 언제 결과를 기대할 수 있는지 알아야 합니다. 그것은 프로젝트 견적과 같습니다. 관리자들이 정말로 알고 싶어하는 것은 프로젝트가 언제 완료될 것인지, 일주일 안에 완료될 것인지가 아니라 앞으로 6개월 안에 그것을 할 시간이 아무도 없을 것인지입니다.
Celery 신호를 사용하여 작업 시간을 측정할 수 있습니다.
팁 1: Celery 신호의 모든 매개변수는 키워드 매개변수입니다. 즉, 관심 있는 키워드 인수만 나열하고 나머지는 **kwargs
에 넣을 수 있습니다. 이것은 훌륭한 디자인입니다! 모든 신호는 이렇게 해야 합니다!
팁 2: 작업 개체의 "headers" 속성에 실행 시작 및 종료 시간을 저장할 수 있습니다.
Celery 작업이 대기열에 들어가면 현재 시간을 기록합니다.
from celery import signals from dateutil.parser import isoparse from datetime import datetime, timezone @signals.before_task_publish.connect def before_task_publish(*, headers: dict, **kwargs): raw_eta = headers.get("eta") publish_time = isoparse(raw_eta) if raw_eta else datetime.now(tz=timezone.utc) headers["__publish_time"] = publish_time.isoformat()
작업자 프로세스가 작업을 수신하면 현재 시간을 기록합니다.
from celery import signals from datetime import datetime, timezone @signals.task_prerun.connect def task_prerun(*, task: Task, **kwargs): setattr(task.request, "__prerun_time", datetime.now(tz=timezone.utc).isoformat())
작업이 완료되면 실행 시간을 계산하여 StatsD나 기타 모니터링 도구 같은 곳에 저장합니다.
StatsD는 애플리케이션을 모니터링하고 소프트웨어를 계측하여 맞춤형 측정항목을 제공하기 위한 업계 표준 기술 스택입니다.
from celery import signals, Task from dateutil.parser import isoparse from datetime import datetime, timezone, timedelta def to_milliseconds(td: timedelta) -> int: return int(td.total_seconds() * 1000) @signals.task_postrun.connect def task_postrun(*, task: Task, **kwargs): now = datetime.now(tz=timezone.utc) publish_time = isoparse(getattr(task.request, "__publish_time", "")) prerun_time = isoparse(getattr(task.request, "__prerun_time", "")) exec_time = now - prerun_time if prerun_time else timedelta(0) waiting_time = prerun_time - publish_time if publish_time and prerun_time else timedelta(0) waiting_and_exec_time = now - publish_time if publish_time else timedelta(0) stats = { "exec_time_ms": to_milliseconds(exec_time), "waiting_time_ms": to_milliseconds(waiting_time), "waiting_and_exec_time_ms": to_milliseconds(waiting_and_exec_time), } # TODO: 将统计数据发送到 StatsD 或其他监控工具 statsd.timing(f"celery.task.exec_time_ms", stats["exec_time_ms"], tags=[f"task:{task.name}"]) # ... 发送其他统计数据 ...
위 함수에 하드코딩된 임계값을 추가할 수 있습니다.
if exec_time > timedelta(hours=1): logger.error(f"任务 {task.name} 执行时间过长: {exec_time}。请检查!")
또는 작업 정의 또는 코드로 표현할 수 있는 모든 항목을 기반으로 다중 수준 임계값 또는 임계값을 설정할 수 있습니다.
위 내용은 Celery 작업의 실행 시간을 어떻게 측정합니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!