气流:`'TaskInstance'对象在通过DagRun.get_task_instances()检索任务实例后没有属性'task'`

2024-04-29 12:19:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在运行composer-1.16.6-airflow-1.10.15

对于每日计划的DAG,我想编写一个自定义的on_failure_notification,它只在任务实例连续失败多天时发送通知。我的计划是获取dag运行的失败任务实例,并在最后一次成功执行日期检查每个实例:

def my_on_failure_notification(context):
    failed_tis = context["dag_run"].get_task_instances(state=State.FAILED)
    tis_to_notify_about = [ti.task_id for ti in failed_tis if ti.previous_execution_date_success < days_ago(2)]

此操作失败,跟踪如下:

[...]
File "/home/airflow/gcs/dags/xxx.py", line 94, in my_on_failure_notification
ti.task_id for ti in failed_tis if ti.previous_execution_date_success < days_ago(2)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 625, in previous_execution_date_success
prev_ti = self._get_previous_ti(state=State.SUCCESS)
File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 582, in _get_previous_ti
dag = self.task.dag
AttributeError: 'TaskInstance' object has no attribute 'task'

我假设这是因为TIs是作为SQLAlchemy模型检索的,它不包含task属性。这是故意的行为吗?有没有建议的替代方案


Tags: 实例inpytaskfailureonlineti