Voici comment la tâche « ajouter un hachage à un DataFrame existant » est passée de quelques jours à consommer presque un sprint entier.
Au deuxième trimestre 2022, j'ai commencé à travailler sur un pipeline de données qui récupère les données de marché à partir d'un service REST et les stocke dans une table BigQuery. Il s'agit d'une explication de haut niveau du pipeline. La partie intéressante est la manière dont les données sont interrogées, en les convertissant en DataFrame, puis en les téléchargeant dans des tables BigQuery à l'aide de GCSToBigQueryOperator d'AirFlow.
Au départ, cela semblait simple à écrire, mais le principe « idempotent » d'Airflow lui a ajouté un peu de défi. Ce qu'il faut récupérer de ce service REST a été décidé par une autre table et même si JOB est idempotent, la table qu'il a utilisée comme référence peut changer entre 2 exécutions. Après avoir passé plus de temps, les discussions avec les pipelines des Data Engineers étaient prêtes à la fin du troisième trimestre 2022.
Avance rapide jusqu'au premier trimestre 2024. À cette époque, davantage d'utilisateurs accédaient aux données et nous avons réalisé que notre modèle de requête n'utilisait pas correctement les partitions. Ou plutôt, nous voulions accéder aux données en fonction d'une colonne de chaîne, mais il n'est pas possible de partitionner sur une colonne de chaîne dans BigQuery. Cela a conduit à analyser de grandes quantités de données et à atteindre fréquemment le quota quotidien.
Cela nous a amené à réfléchir à la manière de partitionner les données en fonction de colonnes de chaînes. Notre ingénieur de données a suggéré de convertir cette colonne de chaîne en un entier à l'aide de FarmHash avec une opération modulo supplémentaire. Dans la preuve de concept, cela a réduit l'analyse de près de 90 % et les performances des requêtes ont été multipliées par 3 à 5. Nous avons décidé de procéder ainsi comme solution finale. Tout ce dont nous avions besoin était de :
Pour calculer les empreintes digitales FarmHash en Python, il existe un module pyfarmhash. J'ai installé le module et utilisé le code ci-dessous pour calculer le hachage, et localement tout a fonctionné comme souhaité.
def get_hash(val: str) -> int: return additonal_logic(pyfarmhash.fingerprint64(...)) df[‘hash’] = df[‘Col’].apply(get_hash)
Une fois tous les tests réussis, il était maintenant temps de transmettre le code à Airflow et de l'exécuter. Je ne m'attendais pas à ce que quelque chose se passe mal à ce stade. En fait, j'étais heureux que tout se soit déroulé comme prévu et dans les délais estimés.
Avec un esprit heureux et plein de confiance, j'ai poussé mes changements, j'ai commencé le travail, puis j'ai attendu 10 à 15 minutes pour qu'il se termine. Pendant ce temps, je suis passé à une autre tâche. Bientôt, j'ai reçu un e-mail d'échec inattendu d'Airflow. J'ai regardé les journaux et j'ai été surpris de voir qu'il a échoué lors de l'installation du module pyfarmhash !
Pour vous aider à comprendre le problème, je dois vous expliquer la structure du travail. Le travail comporte les étapes suivantes :
Dans ce processus, la tâche-1, qui télécharge les données, est un module Python distinct. Pour l'exécuter, j'ai utilisé le PythonVirtualenvOperator d'Airflow. Cet opérateur vous permet de spécifier des packages comme exigences, puis de les installer dans un environnement virtuel nouvellement créé. Une fois le package installé, toutes ses dépendances sont également installées et vous êtes prêt à démarrer.
J'ai ajouté pyfarmhash comme dépendance au module qui télécharge les données, et tout le reste est resté inchangé. Et c'est raté ! Pourquoi ?
pyfarmhash est une bibliothèque de hachage implémentée en C/C++. Lors de l'installation, GCC doit compiler le package, ce qui n'était pas présent sur l'hôte Airflow. Il était logique de ne pas avoir GCC sur l'hôte Airflow, mais malheureusement, c'était un bloqueur pour moi.
J'ai cherché une implémentation Python pure du package pyfarmhash, mais il n'y en avait pas. Ensuite, j’ai cherché des packages de roues, mais encore une fois, il n’y en avait aucun. J'ai envisagé de construire des packages de roues et de les promouvoir, mais cela aurait conduit à une responsabilité à long terme consistant à fournir des packages de roues en interne. Je voulais éviter des étapes supplémentaires de type solution de contournement. J'ai exploré toutes les options et en ai discuté avec l'équipe chargée de la maintenance d'Airflow. Ils ont suggéré de créer une image Docker et de l'exécuter dans KubernetesPodOperator. C'était une bonne option, car je pouvais contrôler l'environnement et inclure tout ce qui était nécessaire sans dépendre d'un environnement externe. De plus, cette solution n’avait aucune solution de contournement. Le seul inconvénient à court terme était qu’il fallait plus de temps pour le mettre en œuvre.
Avant de commencer avec une solution basée sur Docker, j'avais déjà consacré environ 16 à 20 heures à cette tâche. Pour la solution basée sur Docker, je devais également :
Comme je n'allais plus utiliser PythonVirtualEnvOperator dans Airflow, j'ai décidé de le supprimer complètement et d'améliorer également le flux de travail. J'ai dû changer le package python pour avoir des points d'entrée pour démarrer la logique de téléchargement et de purge
Il m'a fallu 30 à 36 heures supplémentaires pour avoir une solution finale avec l'image Docker prête, soit 6 à 7 jours ouvrables et, avec les 2 premiers jours inclus, c'est devenu une longue tâche de sprint.
J'y repense et je me demande si j'ai dû jeter une solution de travail, changer la structure du module, créer une image Docker, modifier plus de 10 tâches AirFlow pour utiliser l'image Docker pour les tâches, faire face à cette réalité et surmonter la frustration initiale. Tout cela uniquement parce que « Un seul module Python nécessitait « gcc » pour compiler ! »
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!