BranchPythonOperator在动态dag中未按预期工作

2024-04-27 02:43:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经编写了动态dag并添加了条件分支逻辑。但是Airflow正在执行所有任务,而不是执行一个任务。你知道吗

我尝试过使用BranchOperator,既使用return方法,又在XCOM中传递任务id

TABLE_LIST = [{'name': 'superman', 'universe': 'dc'}, {'name': 'batman', 'universe': 'dc'}, {'name': 'iron_man', 'universe': 'marvel'}]


def check_what_to_do(superhero, **kwargs):

    name = superhero['name']
    universe = superhero['universe']
    kwargs['ti'].xcom_push(key=name, value=universe)

def branch(superhero, **kwargs):

    name = superhero['name']
    universe = superhero['universe']

    print(kwargs['ti'].xcom_pull(task_ids=name, key=name))

    if universe == 'dc':
        return name+'_dc'
    elif universe == 'marvel':
        return name+'_marvel'

def branch_op(superhero):

    task_id = 'branch_' + superhero['name']

    return BranchPythonOperator(

            dag=dag,
            task_id=task_id,
            python_callable=branch,
            provide_context=True,
            op_kwargs={
                'superhero':superhero
            }
        )

def check_what_to_do_op(superhero):
        task_id = superhero['name']

        return PythonOperator(
            dag=dag,
            task_id=task_id,
            python_callable=check_what_to_do,
            provide_context=True,
            op_kwargs={
                'superhero':superhero
            }
        )

    for hero in TABLE_LIST:

        task_id_marvel = hero['name']+'_marvel'
        task_id_dc = hero['name']+'_dc'
        check_what_to_do_op(hero) >> branch_op(hero)
        branch_op(hero) >> DummyOperator(dag=dag, task_id=task_id_marvel)
        branch_op(hero) >> DummyOperator(dag=dag, task_id=task_id_dc)

我只希望运行一个分支,跳过另一个分支,但两个分支都在执行。你知道吗


Tags: namebranchidtaskreturndef分支dc