上游跳跃时气流“无故障”跳过

2024-05-10 01:06:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个工作流,其中我有两个并行进程(sentinel_runsentinel_skip),它们应该根据条件运行或跳过,然后连接在一起(resolve)。我需要直接位于sentinel_任务下游的任务进行级联跳过,但是当它到达resolve任务时,resolve应该运行,除非上游的任一进程出现故障。在

根据documentation,“无故障”触发规则应该起作用:

none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped

它也是一个related question的答案。在

然而,当我实现了一个小例子时,我看到的并不是这样:

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.utils.dates import days_ago

dag = DAG(
    "testing",
    catchup=False,
    schedule_interval="30 12 * * *",
    default_args={
        "owner": "test@gmail.com",
        "start_date": days_ago(1),
        "catchup": False,
        "retries": 0
    }
)

start = DummyOperator(task_id="start", dag=dag)

sentinel_run = ShortCircuitOperator(task_id="sentinel_run", dag=dag, python_callable=lambda: True)
sentinel_skip = ShortCircuitOperator(task_id="sentinel_skip", dag=dag, python_callable=lambda: False)

a = DummyOperator(task_id="a", dag=dag)
b = DummyOperator(task_id="b", dag=dag)
c = DummyOperator(task_id="c", dag=dag)
d = DummyOperator(task_id="d", dag=dag)
e = DummyOperator(task_id="e", dag=dag)
f = DummyOperator(task_id="f", dag=dag)
g = DummyOperator(task_id="g", dag=dag)

resolve = DummyOperator(task_id="resolve", dag=dag, trigger_rule="none_failed")

start >> sentinel_run >> a >> b >> c >> resolve
start >> sentinel_skip >> d >> e >> f >> resolve

resolve >> g

此代码创建以下dag:

DAG Design

问题是resolved任务应该执行(因为上游没有upstream_failed或{}),但是它正在跳过。在

我已经反省了数据库,没有隐藏任何失败或上游失败的任务,我不明白为什么它不遵守“无失败”逻辑。在

我知道"ugly workaround"并在其他工作流中实现了它,但它增加了另一个要执行的任务,并增加了DAG的新用户必须摸索的复杂性(尤其是当您将其乘以多个任务时…)。这是我从气流1.8升级到气流1.10的主要原因,所以我希望我遗漏了一些明显的东西。。。在


Tags: runfromimportidtaskstartsentinelairflow
2条回答

把这件事记录下来是因为这个问题让我痛苦了两次,现在我解决了两次。在

问题分析

当您将日志级别设置为“调试”时,您将开始看到发生了什么:

[2019-10-09 18:30:05,472] {python_operator.py:114} INFO - Done. Returned value was: False
[2019-10-09 18:30:05,472] {python_operator.py:159} INFO - Condition result is False
[2019-10-09 18:30:05,472] {python_operator.py:165} INFO - Skipping downstream tasks...
[2019-10-09 18:30:05,472] {python_operator.py:168} DEBUG - Downstream task_ids [<Task(DummyOperator): f>, <Task(DummyOperator): g>, <Task(DummyOperator): d>, <Task(DummyOperator): resolve>, <Task(DummyOperator): e>]
[2019-10-09 18:30:05,492] {python_operator.py:173} INFO - Done.

从中,您可以看到问题不在于“none-unu-failed”错误地处理任务,而是模拟跳过条件的sentinel直接标记所有下游依赖项被直接跳过。这是shortcutuitOperator的一种行为—跳过所有下游任务,包括下游任务的下游任务。在

解决方案

解决这个问题的方法在于认识到是shortcutuitOperator的行为,而不是TriggerRule。一旦我们意识到这一点,就应该着手编写一个更适合我们实际要完成的任务的运算符了。在

我已经包括了我当前使用的操作符;我欢迎任何关于处理单个下游任务修改的更好方法的输入。我相信“跳过下一个,让其余的根据它们的触发规则级联”有一个更好的习惯用法,但是我已经花了太多的时间在这上面了,我怀疑答案更深层次地存在于内部。在

^{pr2}$

经过修改后,这将产生预期的dag结果:

Correct Dag

这似乎是气流中的一个缺陷。如果要修复,请将您的声音添加到https://issues.apache.org/jira/browse/AIRFLOW-4453。在

相关问题 更多 >