气流分支Python操作员未跟随指定分支

2024-04-19 05:56:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个气流DAG,结构如下。在

Airflow Graph View

所有以“check*”开头的函数都是branchpythonooperator,而函数exceptionControl是一个ExecuteDagRunOperator,它接收每个错误以处理它们。在

这是DAG配置:

checkCloudFunctions = BranchPythonOperator(
    task_id='checkCloudFunctions',
    python_callable=check_cloud_functions,
    provide_context=True,
    dag=dag)

checkSqlTables = BranchPythonOperator(
    task_id='checkSqlTables',
    python_callable=check_sql_tables,
    provide_context=True,
    dag=dag)

checkBigQueryTable = BranchPythonOperator(
    task_id='checkBigQueryTable',
    python_callable=check_big_query_table,
    provide_context=True,
    dag=dag)

labBuilt = DummyOperator(
    task_id='labBuilt',
    dag=dag)

exceptionControl = ExecuteDagRunOperator(
    task_id='exceptionControl',
    execute_dag_id="SYS_exception_control",
    python_callable=mediation.dag_trigger_exception,
    trigger_rule='one_success',
    dag=dag)

# graphs
checkCloudFunctions >> checkSqlTables
checkCloudFunctions >> exceptionControl

checkSqlTables >> checkBigQueryTable
checkSqlTables >> exceptionControl

checkBigQueryTable >> labBuilt
checkBigQueryTable >> exceptionControl

问题是checkSqlTables应该遵循异常控制,但是它跳过了,DAG结束了。函数返回“exceptionControl”,我们可以在checkSqlTables日志中看到:

^{pr2}$

我也使用了触发器规则的属性(一次成功,虚拟的…),但它似乎不起作用。在

如果我删除第一步,它似乎可以工作,所以它似乎应该是我的dag的某种配置问题。在

enter image description here

你知道为什么checkSqlTables函数不分支到exceptionControl吗?在

编辑:在new deep reading to the Airflow Documentation中,我注意到如果一个步骤将一个任务标记为跳过,那么它将永远被跳过,因此我的代码将永远无法使用分支运算符。在

使用分支的解在每一步之前都包含一个虚拟步骤。但是我有一些DAG有超过10个步骤,这个模式将完全是一个混乱。在


Tags: 函数idtruetaskcheckcontextdagprovide