气流:ExternalTaskSensor未按预期工作。不同的任务时间表

2024-04-26 11:02:51 发布

您现在位置:Python中文网/ 问答频道 /正文

同事们,我们需要帮助。有两个DAG父级和子级,父级有自己的计划,假设“30****”,子级“18-17**1-5”,子级等待父级执行,例如40分钟,如果父级以错误结束,则子级也会因错误而崩溃,否则将执行子类的下一个任务。问题是,即使在最简单的情况下,这也不起作用,我不知道如何同步它们。我编写了如下代码:

Dag父级

import time

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task_sensor  import ExternalTaskSensor, ExternalTaskMarker

start_date =  datetime(2021, 3, 1, 20, 36, 0)

class Exept(Exception):
    pass

def wait():
    time.sleep(3)
    with open('etl.txt', 'r') as txt:
        line = txt.readline()
        if line == 'err':
            print(1)
            raise Exept
    return 'etl success'


with DAG(
    dag_id="dag_etl1",
    start_date=start_date,
    schedule_interval='* * * * *',
    tags=['example2'],
) as etl1:
    parent_task = ExternalTaskMarker(
        task_id="parent_task",
        external_dag_id="dag_etl1",
        external_task_id="etl_child",
    )
    wait_timer = PythonOperator(task_id='wait_timer', python_callable=wait)
    
    wait_timer >> parent_task

Dag子项

from datetime import datetime, timedelta


from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task_sensor  import ExternalTaskSensor, ExternalTaskMarker

from etl_parent import etl1, wait_timer, parent_task

start_date =  datetime(2021, 3, 1, 20, 36, 0)

def check():
    return 'I succeeded'

with DAG(
    dag_id='etl_child', 
    start_date=start_date, 
    schedule_interval='* * * * *',
    tags = ['testing_child_dag']
) as etl_child:
    status = ExternalTaskSensor(
        task_id="dag_etl1",
        external_dag_id=etl1.dag_id,
        external_task_id=parent_task.task_id,
        allowed_states=['success'],
        mode='reschedule',
        execution_delta=timedelta(minutes=1),
        timeout=60,
    )

    task1 = PythonOperator(task_id='task1', python_callable=check)
    
    status >> task1

如您所见,我试图模拟当文本文件中指定了err并且在任何其他情况下成功时父任务失败的情况。但这一点也不象我所期望的那样,在dag的第一个开始,一切正常,工作正常,如果我更改文本文件中的数据,那么父任务工作正常,例如,我启动父dag时故意出错,一切正常,子类将以错误结束,但是如果我更改文本,同样,家长会正确地工作,但孩子会继续跌倒一段时间,那么这可能是正确的,但不是事实。如果发射成功,情况是一样的,恰恰相反。此外,我不明白如何在父dag中组织等待任务完成的任务

请帮助)我最近一直在使用气流,我可能遗漏了一些东西


Tags: fromimportidtaskdateetlstartexternal
1条回答
网友
1楼 · 发布于 2024-04-26 11:02:51

ExternalTaskSensor出现问题的最常见原因是执行_delta参数,因此我将从这里开始

我看到父dag和子dag都有完全相同的开始日期和计划间隔,但您的执行增量为1分钟。在本例中,您的子dag查找在20:35开始的父dag(在您的示例中),但它实际上在20:36开始,因此失败。即使对于测试,也要尝试将父dag设置为20:35开始,看看它是否解决了问题

这里有一篇很好的文章详细介绍了schedule_interval陷阱https://medium.com/@fninsiima/sensing-the-completion-of-external-airflow-tasks-827344d03142

关于等待时间,这是ExternalTaskSensor中的超时参数。在您的情况下,它会等待60秒才失败。就我个人而言,我会非常谨慎地设置一个较长的超时时间。当您的传感器正在等待时,它占用了一个工作线程,因此没有其他任务可以使用它,这可能会导致您的其他任务被锁定,无法执行,特别是如果您有很多传感器

相关问题 更多 >