为什么我的DAG不工作却标记为成功?

0 投票
1 回答
58 浏览
提问于 2025-04-14 15:26

我写了一个有向无环图(DAG),但是当我触发它的时候,什么都没有发生。不过在界面上我可以看到,它显示已经成功结束了。这里是我的代码:

from datetime import datetime
from sqlalchemy import create_engine, text
import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator




with DAG(
    dag_id='test',
    start_date=datetime.today(),
    schedule_interval=None
) as dag:



    def _select():
        engine = create_engine("postgresql://postgres:123456@localhost:5433/main_base")
        try:
            with engine.connect() as conn:
                statement = conn.execute(text(
                    '''
                    INSERT INTO public.main("id", "name")
                    VALUES (5, "'med'");
                    '''
                ))
                conn.commit()
                return True
        except:
            return False


    insert_data = PythonOperator(
        task_id='insert_data',
        python_callable=_select,
        dag=dag,
    )





    insert_data 

我检查了Postgres的连接、表格等等,都是正常的,当我在开发环境中使用这个功能时,它是可以工作的。

1 个回答

0

这个 _select 函数不会因为有 try-except 这个块而出错。所以,你的任务总是能成功。不过,如果 _select 出现错误时你返回 False,那么在 Airflow 的 XCom 中你可能会看到 False,就像下面这个图所示。

enter image description here

如果你希望在出错时让任务失败,可以选择去掉 try-except,或者在出错时主动抛出一个错误。

撰写回答