Un nouveau membre a été ajouté à la collection de codes en double : le suivi du temps d'exécution des tâches Celery.
Chaque tâche Céleri a en fait deux temps "d'exécution" différents :
Les deux sont importants car notre objectif ultime est de savoir quand la tâche est terminée.
Après avoir déclenché une tâche, nous devons savoir quand la tâche est terminée et quand nous pouvons en attendre les résultats. C'est comme l'estimation d'un projet. Ce que les managers veulent vraiment savoir, c'est quand le projet sera terminé, non pas qu'il sera terminé en une semaine mais que personne n'aura le temps de le faire dans les six prochains mois.
Nous pouvons utiliser les signaux de céleri pour chronométrer les tâches.
Astuce 1 : Tous les paramètres des signaux de céleri sont des paramètres de mots clés. Cela signifie que nous pouvons simplement lister les arguments de mots-clés qui nous intéressent et regrouper le reste dans **kwargs
. C'est une superbe conception ! Tous les signaux doivent être effectués de cette façon !
Astuce 2 : Nous pouvons stocker l'heure de début et de fin d'exécution dans la propriété "headers" de l'objet tâche.
Lorsque la tâche Céleri entre dans la file d'attente, enregistrez l'heure actuelle :
<code class="language-python">from celery import signals from dateutil.parser import isoparse from datetime import datetime, timezone @signals.before_task_publish.connect def before_task_publish(*, headers: dict, **kwargs): raw_eta = headers.get("eta") publish_time = isoparse(raw_eta) if raw_eta else datetime.now(tz=timezone.utc) headers["__publish_time"] = publish_time.isoformat()</code>
Lorsque le processus de travail reçoit la tâche, enregistrez l'heure actuelle :
<code class="language-python">from celery import signals from datetime import datetime, timezone @signals.task_prerun.connect def task_prerun(*, task: Task, **kwargs): setattr(task.request, "__prerun_time", datetime.now(tz=timezone.utc).isoformat())</code>
Lorsque la tâche est terminée, calculez le temps d'exécution et stockez-le quelque part, comme StatsD ou un autre outil de surveillance.
StatsD est la pile technologique standard de l'industrie pour surveiller les applications et instrumenter tout logiciel afin de fournir des métriques personnalisées.
<code class="language-python">from celery import signals, Task from dateutil.parser import isoparse from datetime import datetime, timezone, timedelta def to_milliseconds(td: timedelta) -> int: return int(td.total_seconds() * 1000) @signals.task_postrun.connect def task_postrun(*, task: Task, **kwargs): now = datetime.now(tz=timezone.utc) publish_time = isoparse(getattr(task.request, "__publish_time", "")) prerun_time = isoparse(getattr(task.request, "__prerun_time", "")) exec_time = now - prerun_time if prerun_time else timedelta(0) waiting_time = prerun_time - publish_time if publish_time and prerun_time else timedelta(0) waiting_and_exec_time = now - publish_time if publish_time else timedelta(0) stats = { "exec_time_ms": to_milliseconds(exec_time), "waiting_time_ms": to_milliseconds(waiting_time), "waiting_and_exec_time_ms": to_milliseconds(waiting_and_exec_time), } # TODO: 将统计数据发送到 StatsD 或其他监控工具 statsd.timing(f"celery.task.exec_time_ms", stats["exec_time_ms"], tags=[f"task:{task.name}"]) # ... 发送其他统计数据 ...</code>
Il est possible d'ajouter un seuil codé en dur dans la fonction ci-dessus :
<code class="language-python">if exec_time > timedelta(hours=1): logger.error(f"任务 {task.name} 执行时间过长: {exec_time}。请检查!")</code>
Alternativement, on peut définir des seuils à plusieurs niveaux ou des seuils basés sur la définition de la tâche, ou tout ce qui peut être exprimé dans le code.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!