Saya mempunyai pengendali python dalam aliran udara yang dilaksanakan menggunakan komposer awan:
with DAG( dag_id = config['dag_id'], schedule_interval = config['schedule_interval'], default_args = default_args ) as dag: generate_data_task = PythonOperator( task_id = 'generate_dummy_data', python_callable = generate_data, dag = dag )
generate_data() menulis fail csv bernama unik yang dijana secara rawak ke dalam baldi yang mengandungi beberapa data. Melaksanakan apa adanya berfungsi dengan baik, tetapi saya mahu melaksanakan tugas yang sama beberapa kali secara selari. Jika saya menyatakan 10 pelaksanaan selari, saya menjangkakan 10 fail akan ditulis ke baldi. Saya telah mencuba concurrency dan task_concurrency tetapi mendapat hasil yang sama.
Bolehkah ini dicapai menggunakan aliran udara di atas komposer awan?
Gunakan Pemetaan Tugasan Dinamik:
generate_data_task = PythonOperator.partial( task_id = 'generate_dummy_data', python_callable = generate_data, dag = dag ).expand(op_args=[[]] * 10)
Atas ialah kandungan terperinci Laksanakan operator yang sama seperti berbilang tugas dalam Cloud Composer. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!