条件dag运行重试

2024-04-27 21:06:40 发布

您现在位置:Python中文网/ 问答频道 /正文

是否可以动态更改DAG的重试次数

想象一个简单的dag:

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def conditonnal_retry(value=True):
    if value:
        return "retry should occur if dag run fails"
    else:
        return "no need for a retry if dag run fails."

args = {
    'owner': 'airflow',
    'start_date': days_ago(2),
    'retries': 3,
    'retry_delay': timedelta(minutes=15)
}

dag = DAG(
    dag_id='example',
    default_args=args,
    schedule_interval=None,
    tags=['example']
)

run_this = PythonOperator(
    task_id='conditonnal_retry',
    python_callable=conditonnal_retry,
    dag=dag,
)

run_this

我的目标是拥有一个函数,用于检查重试与否是否最糟糕,并禁用此特定dag_运行的重试功能。
它可能标志着dag_运行失败或成功。我只想在满足某些条件时避免低效的重试

可能吗


Tags: runfromimportreturnifvalueargsago
2条回答

这可能有用

您可以根据需要随时更新参数,但我建议您不要这样做,因为这可能会更改DAG行为并影响正在运行的工作流

如果您仍然想这样做,那么Xcom将有所帮助。 http://airflow.incubator.apache.org/concepts.html#xcoms

您还可以查看ShortCircuitOperator,并使用它跳过所有下游任务。一些链接:

但是在编写实现之前,请检查本文中解释的一些缺点:https://blog.diffractive.io/2018/08/07/replacement-shortcircuitoperator-for-airflow/

相关问题 更多 >