是否可以动态更改DAG的重试次数
想象一个简单的dag:
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def conditonnal_retry(value=True):
if value:
return "retry should occur if dag run fails"
else:
return "no need for a retry if dag run fails."
args = {
'owner': 'airflow',
'start_date': days_ago(2),
'retries': 3,
'retry_delay': timedelta(minutes=15)
}
dag = DAG(
dag_id='example',
default_args=args,
schedule_interval=None,
tags=['example']
)
run_this = PythonOperator(
task_id='conditonnal_retry',
python_callable=conditonnal_retry,
dag=dag,
)
run_this
我的目标是拥有一个函数,用于检查重试与否是否最糟糕,并禁用此特定dag_运行的重试功能。
它可能标志着dag_运行失败或成功。我只想在满足某些条件时避免低效的重试
可能吗
这可能有用
您可以根据需要随时更新参数,但我建议您不要这样做,因为这可能会更改DAG行为并影响正在运行的工作流
如果您仍然想这样做,那么
Xcom
将有所帮助。 http://airflow.incubator.apache.org/concepts.html#xcoms您还可以查看
ShortCircuitOperator
,并使用它跳过所有下游任务。一些链接:ShortCircuitOperator
example_short_circuit_operator.py
但是在编写实现之前,请检查本文中解释的一些缺点:https://blog.diffractive.io/2018/08/07/replacement-shortcircuitoperator-for-airflow/
相关问题 更多 >
编程相关推荐