扩展Airflow DAG类 - 这是否是坏习惯?
我没有看到任何相关的例子,所以我在想,扩展DAG类是不是一种不好的做法。如果是的话,为什么呢?
接下来我会举个例子,说明这种做法可能有用的地方……
假设我们有几个DAG,它们都有相同的行为:在最后一步调用一个特定的函数,无论之前的步骤是成功还是失败。这个函数可能是调用某个外部API之类的。
我想的解决办法大致是这样的:
- 扩展DAG类,创建一个新的类叫DAGWithFinishAction
- 在DAGWithFinishAction中实现on_success_callback和on_failure_callback,以达到我想要的效果
- 在代码中使用这个新类,比如
with DAGWithFinishAction(dag_id=..., ...) as dag: ...
- 在每个实现的DAG中安排任务
- 期望每个DAG在所有任务完成后(无论结果如何)调用它的成功/失败回调
这种做法有什么问题吗?我找不到类似的例子,这让我觉得我可能漏掉了什么。
class DAGWithFinishAction(DAG):
def __init__(self, dag_id, **kwargs):
self.metric_callback = publish_execution_time
on_success_callback = kwargs.get("on_success_callback")
if on_success_callback is None:
on_success_callback = self.metric_callback
else:
if isinstance(on_success_callback, list):
on_success_callback.append(self.metric_callback)
else:
on_success_callback = [on_success_callback, self.metric_callback]
kwargs["on_success_callback"] = on_success_callback
super().__init__(dag_id, **kwargs)
with DAGWithFinishAction(dag_id=..., ...) as dag:
...
上面的代码是可以工作的,但我还是不确定这种做法是否应该避免,或者在设计DAG时是否是一个合理的方式。
0 个回答
暂无回答