我有一个气流DAG,结构如下。在
所有以“check*”开头的函数都是branchpythonooperator,而函数exceptionControl是一个ExecuteDagRunOperator,它接收每个错误以处理它们。在
这是DAG配置:
checkCloudFunctions = BranchPythonOperator(
task_id='checkCloudFunctions',
python_callable=check_cloud_functions,
provide_context=True,
dag=dag)
checkSqlTables = BranchPythonOperator(
task_id='checkSqlTables',
python_callable=check_sql_tables,
provide_context=True,
dag=dag)
checkBigQueryTable = BranchPythonOperator(
task_id='checkBigQueryTable',
python_callable=check_big_query_table,
provide_context=True,
dag=dag)
labBuilt = DummyOperator(
task_id='labBuilt',
dag=dag)
exceptionControl = ExecuteDagRunOperator(
task_id='exceptionControl',
execute_dag_id="SYS_exception_control",
python_callable=mediation.dag_trigger_exception,
trigger_rule='one_success',
dag=dag)
# graphs
checkCloudFunctions >> checkSqlTables
checkCloudFunctions >> exceptionControl
checkSqlTables >> checkBigQueryTable
checkSqlTables >> exceptionControl
checkBigQueryTable >> labBuilt
checkBigQueryTable >> exceptionControl
问题是checkSqlTables应该遵循异常控制,但是它跳过了,DAG结束了。函数返回“exceptionControl”,我们可以在checkSqlTables日志中看到:
^{pr2}$我也使用了触发器规则的属性(一次成功,虚拟的…),但它似乎不起作用。在
如果我删除第一步,它似乎可以工作,所以它似乎应该是我的dag的某种配置问题。在
你知道为什么checkSqlTables函数不分支到exceptionControl吗?在
编辑:在new deep reading to the Airflow Documentation中,我注意到如果一个步骤将一个任务标记为跳过,那么它将永远被跳过,因此我的代码将永远无法使用分支运算符。在
使用分支的解在每一步之前都包含一个虚拟步骤。但是我有一些DAG有超过10个步骤,这个模式将完全是一个混乱。在
目前没有回答
相关问题 更多 >
编程相关推荐