2024-06-07 18:35:30 发布
网友
我想在DAG A完成执行时清除DAG B中的任务。A和B都是计划的DAG。在
是否有任何operator/方法来清除任务状态并以编程方式重新运行DAG B?在
operator
我知道使用CLI option和webui选项来清除任务。在
我建议你远离CLI!在
与通过basheoperator和/或CLI模块相比,dags/tasks的气流功能在引用对象时公开得更好。在
向dag a添加一个名为“clear_dag_b”的python操作,该操作从dags文件夹(模块)导入dag_b,并执行以下操作:
from dags.dag_b import dag as dag_b def clear_dag_b(**context): exec_date = context[some date object, I forget the name] dag_b.clear(start_date=exec_date, end_date=exec_date)
重要!如果由于某种原因,不匹配或重叠dag计划时间与开始日期/结束日期不匹配,clear()操作将错过dag执行。这个例子假设dagA和B的调度是相同的,并且当A执行dayX时,您只想从B清除dayX
在清除之前,可能需要检查dag_b是否已运行:
由于我的目标是在DAG A完成执行时重新运行DAG B,因此我最终使用BashOperator清除了DAG B:
# Clear the tasks in another dag last_task = BashOperator( task_id='last_task', bash_command= 'airflow clear example_target_dag -c ', dag=dag) first_task >> last_task
SQLAlchemy
@cli_utils.action_logging def clear(args): logging.basicConfig( level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT) dags = get_dags(args) if args.task_regex: for idx, dag in enumerate(dags): dags[idx] = dag.sub_dag( task_regex=args.task_regex, include_downstream=args.downstream, include_upstream=args.upstream) DAG.clear_dags( dags, start_date=args.start_date, end_date=args.end_date, only_failed=args.only_failed, only_running=args.only_running, confirm_prompt=not args.no_confirm, include_subdags=not args.exclude_subdags, include_parentdag=not args.exclude_parentdag, )
from airflow.bin import cli
我建议你远离CLI!在
与通过basheoperator和/或CLI模块相比,dags/tasks的气流功能在引用对象时公开得更好。在
向dag a添加一个名为“clear_dag_b”的python操作,该操作从dags文件夹(模块)导入dag_b,并执行以下操作:
重要!如果由于某种原因,不匹配或重叠dag计划时间与开始日期/结束日期不匹配,clear()操作将错过dag执行。这个例子假设dagA和B的调度是相同的,并且当A执行dayX时,您只想从B清除dayX
在清除之前,可能需要检查dag_b是否已运行:
^{pr2}$由于我的目标是在DAG A完成执行时重新运行DAG B,因此我最终使用BashOperator清除了DAG B:
SQLAlchemy
的魔力。在from airflow.bin import cli
并直接调用所需的函数相关问题 更多 >
编程相关推荐