气流外部传感器Stu

2024-04-25 07:25:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图让气流外部任务传感器的工作,但到目前为止,还没有能够得到它来完成,它似乎总是卡住运行,从来没有完成,所以DAG可以进入下一个任务。你知道吗

下面是我用来测试的代码:


DEFAULT_ARGS = {
    'owner': 'NAME',
    'depends_on_past': False,
    'start_date': datetime(2019, 9, 9),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False
}

external_watch_dag = DAG(
    'DAG-External_watcher-Test',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=1),
    schedule_interval=None
)

start_op = DummyOperator(
    task_id='start_op',
    dag=external_watch_dag
)


trigger_external = TriggerDagRunOperator(
    task_id='trigger_external',
    trigger_dag_id='DAG-Dummy',
    dag=external_watch_dag
)

external_watch_op = ExternalTaskSensor(
    task_id='external_watch_op',
    external_dag_id='DAG-Dummy',
    external_task_id='dummy_task',
    check_existence=True,
    execution_delta=timedelta(minutes=-1),
    # execution_date_fn=datetime(2019, 9, 25),
    execution_timeout=timedelta(minutes=30),
    dag=external_watch_dag
)

end_op = DummyOperator(
    task_id='end_op',
    dag=external_watch_dag
)

start_op >> trigger_external >> external_watch_op >> end_op
# start_op >> [external_watch_op, trigger_external]
# external_watch_op >> end_op


# Below is the setup for the dummy DAG that is called above by the Trigger and watched by the TaskSensor
dummy_dag = DAG(
    'DAG-Dummy',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=1),
    schedule_interval=None
)

dummy_task = BashOperator(
    task_id='dummy_task',
    bash_command='sleep 10',
    dag=dummy_dag
)

我尝试了很多方法来调整这段代码,但是在使用ExternalTaskSensor时没有取得任何成功。你知道吗

有人知道如何解决这个问题并让ExternalTaskSensor正常工作吗?我还了解到,在使用ExternalTaskSensor时,调度时间间隔可能会产生问题,是否部分问题是dag都有schedule_interval=None?你知道吗

我将这两个dag设置为完全相同的schedule_interval,但这在生产中不起作用。目标是使主DAG,即外部监视DAG,在运行过程中定期触发,DAG虚拟对象,而DAG虚拟对象本身具有schedule_interval=None。你知道吗

非常感谢您的帮助。你知道吗


Tags: noneidtaskstartexternaldummytimedeltawatch