J'ai un opérateur python dans airflow exécuté à l'aide de cloud composer :
with DAG( dag_id = config['dag_id'], schedule_interval = config['schedule_interval'], default_args = default_args ) as dag: generate_data_task = PythonOperator( task_id = 'generate_dummy_data', python_callable = generate_data, dag = dag )
generate_data() écrit un fichier csv généré aléatoirement et nommé de manière unique dans un compartiment contenant des données. L'exécution telle quelle fonctionne bien, mais je souhaite exécuter la même tâche plusieurs fois en parallèle. Si je spécifie 10 exécutions parallèles, je m'attends à ce que 10 fichiers soient écrits dans le bucket. J'ai essayé la concurrence et task_concurrency mais j'ai obtenu le même résultat.
Cela peut-il être réalisé en utilisant le flux d'air au-dessus du cloud composer ?
Utilisez Mappage dynamique des tâches :
generate_data_task = PythonOperator.partial( task_id = 'generate_dummy_data', python_callable = generate_data, dag = dag ).expand(op_args=[[]] * 10)
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!