如何在循环中实现气流DAG

2024-04-26 05:18:58 发布

您现在位置:Python中文网/ 问答频道 /正文

我刚从气流开始。我想在一个循环中设置一个DAG,上一个DAG完成时下一个DAG开始。以下是我要实现的工作流程:

list_of_files = [......]
for file in list_of_files:
   dag = DAG('pipeline', default_args=default_args, schedule_interval=None)
   t1 = BashOperator('copy_this_file', ....)
   t2 = BashOperator('process_this_file', ...)
   t1.set_downstream(t2)

如果我运行airflow backfill pipeline -s 2019-05-01,所有dag都将同时启动。在


Tags: ofdefaultpipelineargsfiles流程thislist
1条回答
网友
1楼 · 发布于 2024-04-26 05:18:58

DAG不能相互依赖,它们是独立的工作流。您希望将任务配置为相互依赖。可以有一个具有多个执行分支的单个DAG,每个文件对应一个,如下所示(未测试):

dag = DAG('pipeline', ...)
list_of_files = [......]
with dag:
    for file in list_of_files:
       t1 = BashOperator('copy_this_file', ....)
       t2 = BashOperator('process_this_file', ...)
       t1.set_downstream(t2)

相关问题 更多 >