我在 airflow 中有一個使用 cloud composer 執行的 pythonoperator:
with DAG( dag_id = config['dag_id'], schedule_interval = config['schedule_interval'], default_args = default_args ) as dag: generate_data_task = PythonOperator( task_id = 'generate_dummy_data', python_callable = generate_data, dag = dag )
generate_data() 函數將隨機產生的唯一命名的 csv 檔案寫入包含一些資料的儲存桶中。按原樣執行效果很好,但我想並行多次執行同一任務。如果我指定並行執行 10 次,我預計會在儲存桶中寫入 10 個檔案。我嘗試過並發和task_concurrency,但得到了相同的結果。
這可以在 cloud composer 之上使用 airflow 實現嗎?
使用動態任務映射:
generate_data_task = PythonOperator.partial( task_id = 'generate_dummy_data', python_callable = generate_data, dag = dag ).expand(op_args=[[]] * 10)
以上是在 Cloud Composer 中將相同運算子作為多個任務執行的詳細內容。更多資訊請關注PHP中文網其他相關文章!