以编程方式清除气流任务实例的状态

2024-06-07 18:35:30 发布

您现在位置:Python中文网/ 问答频道 /正文

我想在DAG A完成执行时清除DAG B中的任务。A和B都是计划的DAG。在

是否有任何operator/方法来清除任务状态并以编程方式重新运行DAG B?在


我知道使用CLI option和webui选项来清除任务。在


Tags: 方法cli状态选项编程方式operator计划
3条回答

我建议你远离CLI!在

与通过basheoperator和/或CLI模块相比,dags/tasks的气流功能在引用对象时公开得更好。在

dag a添加一个名为“clear_dag_b”的python操作,该操作从dags文件夹(模块)导入dag_b,并执行以下操作:

from dags.dag_b import dag as dag_b

def clear_dag_b(**context):
   exec_date = context[some date object, I forget the name]
   dag_b.clear(start_date=exec_date, end_date=exec_date) 

重要!如果由于某种原因,不匹配重叠dag计划时间与开始日期/结束日期不匹配,clear()操作将错过dag执行。这个例子假设dagAB的调度是相同的,并且当A执行dayX时,您只想从B清除dayX

在清除之前,可能需要检查dag_b是否已运行:

^{pr2}$

由于我的目标是在DAG A完成执行时重新运行DAG B,因此我最终使用BashOperator清除了DAG B:

# Clear the tasks in another dag
last_task = BashOperator(
    task_id='last_task',
    bash_command= 'airflow clear example_target_dag -c ',
    dag=dag)

first_task >> last_task
  • ^{}是一个非常有用的地方,可以窥探SQLAlchemy的魔力。在
  • 实现^{}命令here
@cli_utils.action_logging
def clear(args):
    logging.basicConfig(
        level=settings.LOGGING_LEVEL,
        format=settings.SIMPLE_LOG_FORMAT)
    dags = get_dags(args)

    if args.task_regex:
        for idx, dag in enumerate(dags):
            dags[idx] = dag.sub_dag(
                task_regex=args.task_regex,
                include_downstream=args.downstream,
                include_upstream=args.upstream)

    DAG.clear_dags(
        dags,
        start_date=args.start_date,
        end_date=args.end_date,
        only_failed=args.only_failed,
        only_running=args.only_running,
        confirm_prompt=not args.no_confirm,
        include_subdags=not args.exclude_subdags,
        include_parentdag=not args.exclude_parentdag,
    )
  • 从源头来看,你可以
    • 复制它(假设您还想稍微修改一下功能)
    • 或者直接执行from airflow.bin import cli并直接调用所需的函数

相关问题 更多 >

    热门问题