Execute the same operator as multiple tasks in Cloud Composer

PHPz
Release: 2024-02-08 21:08:30
forward
571 people have browsed it

在 Cloud Composer 中将同一运算符作为多个任务执行

Question content

I have a pythonoperator executed using cloud composer in airflow:

with DAG(
    dag_id = config['dag_id'],
    schedule_interval = config['schedule_interval'],
    default_args = default_args
    ) as dag:
    
    generate_data_task = PythonOperator(
        task_id = 'generate_dummy_data',
        python_callable = generate_data,
        dag = dag
    )
Copy after login

generate_data() function writes a randomly generated uniquely named csv file into a bucket containing some data. Executing as is works fine, but I want to execute the same task multiple times in parallel. If I specify 10 parallel executions, I expect 10 files to be written to the bucket. I've tried concurrency and task_concurrency but got the same result.

Can this be implemented using airflow on top of cloud composer?


Correct Answer


Use Dynamic Task Mapping:

generate_data_task = PythonOperator.partial(
        task_id = 'generate_dummy_data',
        python_callable = generate_data,
        dag = dag
    ).expand(op_args=[[]] * 10)
Copy after login

The above is the detailed content of Execute the same operator as multiple tasks in Cloud Composer. For more information, please follow other related articles on the PHP Chinese website!

source:stackoverflow.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!