扩展Airflow DAG类 - 这是否是坏习惯?

0 投票
0 回答
46 浏览
提问于 2025-04-12 06:44

我没有看到任何相关的例子,所以我在想,扩展DAG类是不是一种不好的做法。如果是的话,为什么呢?

接下来我会举个例子,说明这种做法可能有用的地方……

假设我们有几个DAG,它们都有相同的行为:在最后一步调用一个特定的函数,无论之前的步骤是成功还是失败。这个函数可能是调用某个外部API之类的。

我想的解决办法大致是这样的:

  • 扩展DAG类,创建一个新的类叫DAGWithFinishAction
  • 在DAGWithFinishAction中实现on_success_callback和on_failure_callback,以达到我想要的效果
  • 在代码中使用这个新类,比如with DAGWithFinishAction(dag_id=..., ...) as dag: ...
  • 在每个实现的DAG中安排任务
  • 期望每个DAG在所有任务完成后(无论结果如何)调用它的成功/失败回调

这种做法有什么问题吗?我找不到类似的例子,这让我觉得我可能漏掉了什么。

class DAGWithFinishAction(DAG):

    def __init__(self, dag_id, **kwargs):
        self.metric_callback = publish_execution_time

        on_success_callback = kwargs.get("on_success_callback")
        if on_success_callback is None:
            on_success_callback = self.metric_callback
        else:
            if isinstance(on_success_callback, list):
                on_success_callback.append(self.metric_callback)
            else:
                on_success_callback = [on_success_callback, self.metric_callback]

        kwargs["on_success_callback"] = on_success_callback
        super().__init__(dag_id, **kwargs)

with DAGWithFinishAction(dag_id=..., ...) as dag:
    ...

上面的代码是可以工作的,但我还是不确定这种做法是否应该避免,或者在设计DAG时是否是一个合理的方式。

0 个回答

暂无回答

撰写回答