Traitement des DAG avec Python asynchrone et graphlib

王林
Libérer: 2024-08-27 06:37:09
original
689 Les gens l'ont consulté

Processing DAGs with async Python and graphlib

Je suis récemment tombé sur un module intéressant dans la bibliothèque standard sans fond de Python : graphlib. Si vous n'avez jamais travaillé avec, il s'agit d'un petit utilitaire qui a été ajouté à Python 3.9 et qui n'implémente qu'une seule classe : TopologicalSorter.

Le nom est explicite : il s'agit d'une classe de tri topologique d'un graphique. Mais je ne pense pas qu'il ait été écrit à l'origine avec simplement un tri à l'esprit, car il possède une API plutôt énigmatique, mais incroyablement utile, telle que la méthode prepare() ou is_active(). Cet exemple dans la documentation fait allusion à la motivation derrière cela :

topological_sorter = TopologicalSorter()

# Add nodes to 'topological_sorter'...

topological_sorter.prepare()
while topological_sorter.is_active():
    for node in topological_sorter.get_ready():
        # Worker threads or processes take nodes to work on off the
        # 'task_queue' queue.
        task_queue.put(node)

    # When the work for a node is done, workers put the node in
    # 'finalized_tasks_queue' so we can get more nodes to work on.
    # The definition of 'is_active()' guarantees that, at this point, at
    # least one node has been placed on 'task_queue' that hasn't yet
    # been passed to 'done()', so this blocking 'get()' must (eventually)
    # succeed.  After calling 'done()', we loop back to call 'get_ready()'
    # again, so put newly freed nodes on 'task_queue' as soon as
    # logically possible.
    node = finalized_tasks_queue.get()
    topological_sorter.done(node)
Copier après la connexion

Graphlib n'est donc pas un module uniquement destiné à trier des graphiques, c'est également un utilitaire permettant d'exécuter des graphiques de tâches dans un ordre topologique, ce qui est utile si vos charges de travail ont des tâches dépendant des résultats d'autres tâches. Les graphiques sont un excellent moyen de modéliser ce problème, et l'ordre topologique permet de vous assurer que les tâches sont traitées dans le bon ordre.

Une chose qui manque dans la documentation est l'exemple asyncio, qui s'avère assez facile à écrire. Puisqu'avec asyncio, vous n'avez pas à vous soucier de la sécurité des threads, vous pouvez vous en sortir sans utiliser de file d'attente pour synchroniser les threads ou toute autre complexité supplémentaire de ce type.

Nous définirons une fonction simpliste de visiteur de nœud asynchrone :

async def visit(node: str, sorter: TopologicalSorter):
    print(f"processing node {node}")
    sorter.done(node)
Copier après la connexion

Dans le monde réel, cela peut être aussi complexe que vous le souhaitez, tant que vous effectuez un travail lié aux E/S, profitez donc des avantages de l'asyncio. L'élément important est d'appeler sorter.done(node) à la fin de la fonction pour faire savoir à l'instance de TopologicalSorter que nous en avons terminé avec ce nœud et que nous pouvons passer au suivant.

Ensuite, nous connectons la fonction de visite à notre exécution ordonnée topologiquement :

sorter = TopologicalSorter(graph)

sorter.prepare()

while sorter.is_active():
    node_group = sorter.get_ready()

    if not node_group:
        # no nodes are ready yet, so we sleep for a bit
        await asyncio.sleep(0.25)
    else:
        tasks = set()
        for node in node_group:
            task = asyncio.create_task(visit(node, sorter))
            tasks.add(task)
            task.add_done_callback(tasks.discard)
Copier après la connexion

Le code source complet d'un script fonctionnel peut être trouvé dans cet aperçu.

Un aspect particulier de graphlib est le format du graphique que le TopologicalSorter accepte comme argument : il est dans l'ordre inverse de votre représentation typique d'un graphique. Par ex. si vous avez un graphique comme celui-ci A -> B -> C, normalement vous le représenteriez comme ceci :

graph = {
  "A": ["B"],
  "B": ["C"],
  "C": [],
}
Copier après la connexion

mais le TopologicalSorter veut que ce graphique soit dans le sens du bord inversé :

Si l'argument graphique facultatif est fourni, il doit s'agir d'un dictionnaire représentant un graphe acyclique orienté où les clés sont des nœuds et les valeurs sont des itérables de tous les prédécesseurs de ce nœud dans le graphique

Donc, la bonne façon de représenter A -> B -> C pour le TopologicalSorter est-ce :

graph = {
  "C": ["B"],
  "B": ["A"],
  "A": [],
}
Copier après la connexion

Plus d'informations et un débat plutôt animé à ce sujet peuvent être trouvés ici : https://bugs.python.org/issue46071.

Bon codage !

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:dev.to
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal