循环之后,如何为一些独立的静态任务提供任务依赖关系。气流

2024-04-24 22:12:01 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个for循环,还有一些中间任务和循环后的一些任务

我只在for循环中给出任务依赖性,正如许多帖子中提到的:

例如:

individual_task1 = SSHOperator (task_id='tk_one'....)

individual_task2 = SSHOperator (task_id='tk_two'....)

individual_task3 = SSHOperator (task_id='tk_three'....)

  for i in [val1,val2,val3,val4.....valn]

    first_task_in_loop = SSHSparkSubmitOperator (task_id='comp_' + i,...)

    second_task_in_loop = SSHOperator(task_id='stats_' + i...)

    individual_task1 >> first_task_in_loop >> second_task_in_loop >> individual_task2 >> individual_task3

但是对于单个任务2和单个任务3,我得到错误:

Broken Dag ,, task_id already registered .

但这是一个循环中没有定义的单个任务,那么为什么我会出现这个错误或者我做错了什么


Tags: inloopidfortask错误individualtk
1条回答
网友
1楼 · 发布于 2024-04-24 22:12:01

试试这个:

individual_task1 = SSHOperator (task_id='tk_one'....)

individual_task2 = SSHOperator (task_id='tk_two'....)

individual_task3 = SSHOperator (task_id='tk_three'....)

  for i in [val1,val2,val3,val4.....valn]

    first_task_in_loop = SSHSparkSubmitOperator (task_id='comp_' + i,...)

    second_task_in_loop = SSHOperator(task_id='stats_' + i...)

    individual_task1 >> first_task_in_loop >> second_task_in_loop >> individual_task2

individual_task2 >> individual_task3

可能是因为您多次设置了相同的任务流而抱怨

相关问题 更多 >