Python中文
首页
教程
问答
标签
搜索
登录
注册
上游跳跃时气流“无故障”跳过
回答此问题可获得
20
贡献值,回答如果被采纳可获得
50
分。
<p>我有一个工作流,其中我有两个并行进程(<code>sentinel_run</code>和<code>sentinel_skip</code>),它们应该根据条件运行或跳过,然后连接在一起(<code>resolve</code>)。我需要直接位于<code>sentinel_</code>任务下游的任务进行级联跳过,但是当它到达<code>resolve</code>任务时,<code>resolve</code>应该运行,除非上游的任一进程出现故障。在</p> <p>根据<a href="https://airflow.apache.org/concepts.html#trigger-rules" rel="nofollow noreferrer">documentation</a>,“无故障”触发规则应该起作用:</p> <blockquote> <p>none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped</p> </blockquote> <p>它也是一个<a href="https://stackoverflow.com/a/55743947/6591849">related question</a>的答案。在</p> <p>然而,当我实现了一个小例子时,我看到的并不是这样:</p> <pre class="lang-py prettyprint-override"><code>from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import ShortCircuitOperator from airflow.utils.dates import days_ago dag = DAG( "testing", catchup=False, schedule_interval="30 12 * * *", default_args={ "owner": "test@gmail.com", "start_date": days_ago(1), "catchup": False, "retries": 0 } ) start = DummyOperator(task_id="start", dag=dag) sentinel_run = ShortCircuitOperator(task_id="sentinel_run", dag=dag, python_callable=lambda: True) sentinel_skip = ShortCircuitOperator(task_id="sentinel_skip", dag=dag, python_callable=lambda: False) a = DummyOperator(task_id="a", dag=dag) b = DummyOperator(task_id="b", dag=dag) c = DummyOperator(task_id="c", dag=dag) d = DummyOperator(task_id="d", dag=dag) e = DummyOperator(task_id="e", dag=dag) f = DummyOperator(task_id="f", dag=dag) g = DummyOperator(task_id="g", dag=dag) resolve = DummyOperator(task_id="resolve", dag=dag, trigger_rule="none_failed") start >> sentinel_run >> a >> b >> c >> resolve start >> sentinel_skip >> d >> e >> f >> resolve resolve >> g </code></pre> <p>此代码创建以下dag:</p> <p><a href="https://i.stack.imgur.com/0rdaA.png" rel="nofollow noreferrer"><img src="https://i.stack.imgur.com/0rdaA.png" alt="DAG Design"/></a></p> <p>问题是<code>resolved</code>任务应该执行(因为上游没有<code>upstream_failed</code>或{<cd9>}),但是它正在跳过。在</p> <p>我已经反省了数据库,没有隐藏任何失败或上游失败的任务,我不明白为什么它不遵守“无失败”逻辑。在</p> <p>我知道<a href="https://stackoverflow.com/q/51664755/6591849">"ugly workaround"</a>并在其他工作流中实现了它,但它增加了另一个要执行的任务,并增加了DAG的新用户必须摸索的复杂性(尤其是当您将其乘以多个任务时…)。这是我从气流1.8升级到气流1.10的主要原因,所以我希望我遗漏了一些明显的东西。。。在</p>
0 条评论
分类:
Python问答
请先
登录
后评论
默认排序
时间排序
1 个回答
匿名
1天前
擅长:python、mysql、java
<p>这似乎是气流中的一个缺陷。如果要修复,请将您的声音添加到<a href="https://issues.apache.org/jira/browse/AIRFLOW-4453" rel="nofollow noreferrer">https://issues.apache.org/jira/browse/AIRFLOW-4453</a>。在</p>
请先
登录
后评论
针对此问题:
更多的回答
关注
89
关注
收藏
1
收藏,
216
浏览
网友 提问于 2天前
相关Python问题
无法使用Django restfram生成PDF
6 回答
无法使用Django Rest框架发送压缩的gzip数据
8 回答
无法使用Django rest框架进行身份验证(请求用户=匿名用户)
5 回答
无法使用Django、Python和JavaScrip触发onclick函数
4 回答
无法使用Django.views.generic.View保存表单
3 回答
无法使用Django(python 2.7,OS X 10.11.1)
5 回答
无法使用Django/mongoengine连接到MongoDB(身份验证失败)
2 回答
无法使用Django\u mssql\u后端迁移到外部hos
9 回答
无法使用Django&Python3.4连接到MySql
5 回答
无法使用Django+nginx上载媒体文件
1 回答
无法使用Django1.6导入名称模式
6 回答
无法使用Django1.7和mongodb登录管理站点
1 回答
无法使用Djangoadmin创建项目,进程使用了错误的路径,因为我事先安装了错误的Python
10 回答
无法使用Djangockedi验证CBV中的字段
1 回答
无法使用Djangocketditor上载图像(错误400)
8 回答
无法使用Djangocron进行函数调用
3 回答
无法使用Djangofiler djang上载文件
8 回答
无法使用Djangokronos
2 回答
无法使用Djangomssql provid
1 回答
无法使用Djangomssql连接到带有Django 1.11的MS SQL Server 2016
8 回答