I have a pythonoperator executed using cloud composer in airflow:
with DAG( dag_id = config['dag_id'], schedule_interval = config['schedule_interval'], default_args = default_args ) as dag: generate_data_task = PythonOperator( task_id = 'generate_dummy_data', python_callable = generate_data, dag = dag )
generate_data() function writes a randomly generated uniquely named csv file into a bucket containing some data. Executing as is works fine, but I want to execute the same task multiple times in parallel. If I specify 10 parallel executions, I expect 10 files to be written to the bucket. I've tried concurrency and task_concurrency but got the same result.
Can this be implemented using airflow on top of cloud composer?
Use Dynamic Task Mapping:
generate_data_task = PythonOperator.partial( task_id = 'generate_dummy_data', python_callable = generate_data, dag = dag ).expand(op_args=[[]] * 10)
The above is the detailed content of Execute the same operator as multiple tasks in Cloud Composer. For more information, please follow other related articles on the PHP Chinese website!