我有一个工作流,其中我有两个并行进程(sentinel_run
和sentinel_skip
),它们应该根据条件运行或跳过,然后连接在一起(resolve
)。我需要直接位于sentinel_
任务下游的任务进行级联跳过,但是当它到达resolve
任务时,resolve
应该运行,除非上游的任一进程出现故障。在
根据documentation,“无故障”触发规则应该起作用:
none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped
它也是一个related question的答案。在
然而,当我实现了一个小例子时,我看到的并不是这样:
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.utils.dates import days_ago
dag = DAG(
"testing",
catchup=False,
schedule_interval="30 12 * * *",
default_args={
"owner": "test@gmail.com",
"start_date": days_ago(1),
"catchup": False,
"retries": 0
}
)
start = DummyOperator(task_id="start", dag=dag)
sentinel_run = ShortCircuitOperator(task_id="sentinel_run", dag=dag, python_callable=lambda: True)
sentinel_skip = ShortCircuitOperator(task_id="sentinel_skip", dag=dag, python_callable=lambda: False)
a = DummyOperator(task_id="a", dag=dag)
b = DummyOperator(task_id="b", dag=dag)
c = DummyOperator(task_id="c", dag=dag)
d = DummyOperator(task_id="d", dag=dag)
e = DummyOperator(task_id="e", dag=dag)
f = DummyOperator(task_id="f", dag=dag)
g = DummyOperator(task_id="g", dag=dag)
resolve = DummyOperator(task_id="resolve", dag=dag, trigger_rule="none_failed")
start >> sentinel_run >> a >> b >> c >> resolve
start >> sentinel_skip >> d >> e >> f >> resolve
resolve >> g
此代码创建以下dag:
问题是resolved
任务应该执行(因为上游没有upstream_failed
或{
我已经反省了数据库,没有隐藏任何失败或上游失败的任务,我不明白为什么它不遵守“无失败”逻辑。在
我知道"ugly workaround"并在其他工作流中实现了它,但它增加了另一个要执行的任务,并增加了DAG的新用户必须摸索的复杂性(尤其是当您将其乘以多个任务时…)。这是我从气流1.8升级到气流1.10的主要原因,所以我希望我遗漏了一些明显的东西。。。在
把这件事记录下来是因为这个问题让我痛苦了两次,现在我解决了两次。在
问题分析
当您将日志级别设置为“调试”时,您将开始看到发生了什么:
从中,您可以看到问题不在于“none-unu-failed”错误地处理任务,而是模拟跳过条件的sentinel直接标记所有下游依赖项被直接跳过。这是shortcutuitOperator的一种行为—跳过所有下游任务,包括下游任务的下游任务。在
解决方案
解决这个问题的方法在于认识到是shortcutuitOperator的行为,而不是TriggerRule。一旦我们意识到这一点,就应该着手编写一个更适合我们实际要完成的任务的运算符了。在
我已经包括了我当前使用的操作符;我欢迎任何关于处理单个下游任务修改的更好方法的输入。我相信“跳过下一个,让其余的根据它们的触发规则级联”有一个更好的习惯用法,但是我已经花了太多的时间在这上面了,我怀疑答案更深层次地存在于内部。在
^{pr2}$经过修改后,这将产生预期的dag结果:
这似乎是气流中的一个缺陷。如果要修复,请将您的声音添加到https://issues.apache.org/jira/browse/AIRFLOW-4453。在
相关问题 更多 >
编程相关推荐