2024-04-24 08:33:30 发布
网友
我有一个dag,我使用下面的运算符列表
我的用例: 比如说,如果整个流程成功完成,我发现中间的数据处理有问题。我想从问题的角度重新运行特定执行日期的作业。我清理了下游,它使工作重新运行。但是,TriggerDagrunoperator因以下问题而失败。在
在airflow.exceptions.DagRunAlreadyExists:dag id已触发的运行id已存在
我想清除这一点,并需要为特定的执行日期再次运行dag。有什么更好的方法来实现这一点?在
使用以下步骤:
DagRun.find()
dag.clear()
dag_to_rerun = ... trigger_dag = ... def delete_previous_dagrun_func(dag_to_delete: airflow.DAG, **context): execution_date = context.get('execution_date') previous_dagruns = DagRun.find(dag_id=dag_to_delete.dag_id, execution_date=execution_date) if previous_dagruns: dag_to_delete.clear( start_date=execution_date, end_date=execution_date, ) return 'no_op' else: return 'trigger' delete_previous_dagrun = PythonBranchOperator( task_id='delete_previous_dagrun', dag=trigger_dag, python_callable=delete_previous_dagrun_func, op_args=(dag_to_rerun,), provide_context=True, ) trigger = TriggerDagRunOperator( task_id='trigger', dag=trigger_dag, trigger_dag_id=dag_to_rerun.dag_id, ) no_op = DummyOperator( task_id='no_op', dag=trigger_dag, ) delete_previous_dagrun >> [trigger, no_op]
使用以下步骤:
DagRun.find()
相同的DagRundag.clear()
从数据库中删除DagRun。我们不需要运行DagRun,因为清除的DagRun将自动重新运行。在相关问题 更多 >
编程相关推荐