如何在文件为空时停止Airflow DAG并跳过后续处理?

0 投票
1 回答
26 浏览
提问于 2025-04-13 12:32

我正在做一个Airflow的工作流(DAG),需要对一个文件进行处理,但前提是这个文件不能是空的。理想情况下,工作流应该先检查文件是否有内容,如果文件是空的,那么这个DAG就应该停止执行,跳过与这个文件相关的后续处理。

这是我Airflow DAG的简化结构:

from google.cloud import storage
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def check_file_not_empty():
    client = storage.Client()
    bucket = client.get_bucket(src_bucket_name)
    blob = bucket.get_blob(blob_name)
    if blob.size == 0:
        raise Exception(f"The file {blob_name} in bucket {src_bucket_name} is empty")

def process_file():
    # Code to process the file

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 3, 21),
    'retries': 1,
}

dag = DAG('file_processing_dag', default_args=default_args, schedule_interval='@daily')

check_file_task = PythonOperator(
    task_id='check_file_not_empty',
    python_callable=check_file_not_empty,
    dag=dag,
)

process_file_task = PythonOperator(
    task_id='process_file',
    python_callable=process_file,
    dag=dag,
)

check_file_task >> process_file_task

看起来我需要调用一些Airflow内部的选项来停止执行,因为异常处理部分只会导致重试,而这不是我想要的。我希望能快速失败。请问我该怎么做呢?

1 个回答

0

你可以使用分支操作符,配合if/else条件来检查文件大小或者文件是否为空。下面是一个可以调用的示例代码:

def my_branch_func(ti) -> None:
    if file_has_content :
        return 'process_file' 
    else:
        return 'noop_task'

或者你提到的短路操作符,详细信息可以查看这个链接:https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#howto-operator-shortcircuitoperator

撰写回答