如何重新读取已经使用TriggerDagrunoperator执行的dag?

2024-04-24 08:33:30 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个dag,我使用下面的运算符列表

  • 触发DagRunOperator触发另一个dag
  • ExternalTaskSensor获取已触发dag的状态

我的用例: 比如说,如果整个流程成功完成,我发现中间的数据处理有问题。我想从问题的角度重新运行特定执行日期的作业。我清理了下游,它使工作重新运行。但是,TriggerDagrunoperator因以下问题而失败。在

在airflow.exceptions.DagRunAlreadyExists:dag id已触发的运行id已存在

我想清除这一点,并需要为特定的执行日期再次运行dag。有什么更好的方法来实现这一点?在


Tags: id列表状态作业运算符流程用例exceptions
1条回答
网友
1楼 · 发布于 2024-04-24 08:33:30

使用以下步骤:

  1. 确定是否需要删除上一个执行日期与DagRun.find()相同的DagRun
  2. 分支。。。
    1. 如果执行了DagRun,请通过dag.clear()从数据库中删除DagRun。我们不需要运行DagRun,因为清除的DagRun将自动重新运行。在
    2. 否则,触发DagRun。在
dag_to_rerun = ...
trigger_dag = ...

def delete_previous_dagrun_func(dag_to_delete: airflow.DAG, **context):
    execution_date = context.get('execution_date')
    previous_dagruns = DagRun.find(dag_id=dag_to_delete.dag_id,
                                   execution_date=execution_date)
    if previous_dagruns:
        dag_to_delete.clear(
            start_date=execution_date,
            end_date=execution_date,
        )
        return 'no_op'
    else:
        return 'trigger'

delete_previous_dagrun = PythonBranchOperator(
    task_id='delete_previous_dagrun',
    dag=trigger_dag,
    python_callable=delete_previous_dagrun_func,
    op_args=(dag_to_rerun,),
    provide_context=True,
)
trigger = TriggerDagRunOperator(
    task_id='trigger',
    dag=trigger_dag,
    trigger_dag_id=dag_to_rerun.dag_id,
)
no_op = DummyOperator(
    task_id='no_op',
    dag=trigger_dag,
)
delete_previous_dagrun >> [trigger, no_op]

相关问题 更多 >