我已经编写了动态dag并添加了条件分支逻辑。但是Airflow正在执行所有任务,而不是执行一个任务。你知道吗
我尝试过使用BranchOperator,既使用return方法,又在XCOM中传递任务id
TABLE_LIST = [{'name': 'superman', 'universe': 'dc'}, {'name': 'batman', 'universe': 'dc'}, {'name': 'iron_man', 'universe': 'marvel'}]
def check_what_to_do(superhero, **kwargs):
name = superhero['name']
universe = superhero['universe']
kwargs['ti'].xcom_push(key=name, value=universe)
def branch(superhero, **kwargs):
name = superhero['name']
universe = superhero['universe']
print(kwargs['ti'].xcom_pull(task_ids=name, key=name))
if universe == 'dc':
return name+'_dc'
elif universe == 'marvel':
return name+'_marvel'
def branch_op(superhero):
task_id = 'branch_' + superhero['name']
return BranchPythonOperator(
dag=dag,
task_id=task_id,
python_callable=branch,
provide_context=True,
op_kwargs={
'superhero':superhero
}
)
def check_what_to_do_op(superhero):
task_id = superhero['name']
return PythonOperator(
dag=dag,
task_id=task_id,
python_callable=check_what_to_do,
provide_context=True,
op_kwargs={
'superhero':superhero
}
)
for hero in TABLE_LIST:
task_id_marvel = hero['name']+'_marvel'
task_id_dc = hero['name']+'_dc'
check_what_to_do_op(hero) >> branch_op(hero)
branch_op(hero) >> DummyOperator(dag=dag, task_id=task_id_marvel)
branch_op(hero) >> DummyOperator(dag=dag, task_id=task_id_dc)
我只希望运行一个分支,跳过另一个分支,但两个分支都在执行。你知道吗
目前没有回答
相关问题 更多 >
编程相关推荐