如何使用“分支”操作符在气流DAG中分支多条路径?

2024-04-24 20:43:20 发布

您现在位置:Python中文网/ 问答频道 /正文

这就是我想要的,但我不知道如何在气流中实现这一点,因为这两项任务都在执行中

enter image description here

总结如下:

  • T1执行
  • T2执行
  • 基于T2的输出,我想或者option_1 -> complete或者option_2 -> Do_x, Do_y -> complete

我应该如何构造这个?我的当前代码是:

(t1 >> t2 >> option_1 >> complete)
(t1 >> t2 >> option_2 >> do_x >> do_y >> complete)

在这种情况下,t2是一个分支操作符

我还尝试了... [option_1, option_2] ... 的语法,但我需要一个完全独立的路径来执行,而不仅仅是切换一个任务


Tags: 代码路径分支语法情况docompleteoption
1条回答
网友
1楼 · 发布于 2024-04-24 20:43:20

代码中的依赖关系对于分支是正确的。确保BranchPythonOperator根据所需的逻辑,在分支开始时返回任务的task_id。更多关于BranchPythonOperatorhere的信息。最后一个重要注意事项与“完成”任务有关。由于分支聚合在“complete”任务上,请确保将trigger_rule设置为“none_failed”(您也可以使用TriggerRule类常量),以便不会跳过该任务

快速代码测试供您参考:

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule

from datetime import datetime


DEFAULT_ARGS = dict(
    start_date=datetime(2021, 5, 5),
    owner="airflow",
    retries=0,
)

DAG_ARGS = dict(
    dag_id="multi_branch",
    schedule_interval=None,
    default_args=DEFAULT_ARGS,
    catchup=False,
)


def random_branch():
    from random import randint

    return "option_1" if randint(1, 2) == 1 else "option_2"


with DAG(**DAG_ARGS) as dag:
    t1 = DummyOperator(task_id="t1")

    t2 = BranchPythonOperator(task_id="t2", python_callable=random_branch)

    option_1 = DummyOperator(task_id="option_1")

    option_2 = DummyOperator(task_id="option_2")

    do_x = DummyOperator(task_id="do_x")

    do_y = DummyOperator(task_id="do_y")

    complete = DummyOperator(task_id="complete", trigger_rule=TriggerRule.NONE_FAILED)

    t1 >> t2 >> option_1 >> complete
    t1 >> t2 >> option_2 >> do_x >> do_y >> complete

DAG with BranchPythonOperator

相关问题 更多 >