如何使用on_failure参数处理Prefict中的任务失败并返回成功?

2024-04-25 21:46:54 发布

您现在位置:Python中文网/ 问答频道 /正文

我在{}中有一个{},它的输出是一个{}。在下面提供的示例中,它总是失败。我希望task使用@task(on_failure=handle_task_fail)返回状态为SUCCESS的空dataframe。实现这一点的正确语法是什么

from pprint import pprint
import pandas as pd

from prefect import Flow, task
from prefect.engine.signals import SUCCESS


def handle_disambig_error(task, old_state, new_state):
    if new_state.is_failed():
        new_state.result["wiki_df"] = pd.DataFrame()

        # Is this needed?
        #set state to SUCCESS
    return new_state


@task(on_failure=handle_disambig_error)
def get_wiki_resource():

    wiki_df = pd.DataFrame(
        {
            "a":[1],
            "b":[1/0]
        }
    )

    return wiki_df

with Flow("Always Fail") as flow:
    wiki_df = get_wiki_resource()

state = flow.run()
task_state = state.result[wiki_df]
pprint(task_state.result)

回溯:

Traceback (most recent call last):
  File "/miniconda3/lib/python3.7/site-packages/prefect/engine/runner.py", line 161, in handle_state_change
    new_state = self.call_runner_target_handlers(old_state, new_state)
  File "/miniconda3/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 120, in call_runner_target_handlers
    new_state = handler(self.task, old_state, new_state) or new_state
  File "/miniconda3/lib/python3.7/site-packages/prefect/utilities/notifications.py", line 69, in state_handler
    fn(obj, new_state)
TypeError: handle_disambig_error() missing 1 required positional argument: 'new_state'
[2020-01-28 17:39:41,759] INFO - prefect.TaskRunner | Task 'get_wiki_resource': finished task run for task with final state: 'Failed'
[2020-01-28 17:39:41,762] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.

我搜索了一些地方State HandlersLogging with a State Handler


Tags: fromimportdfnewtaskwikiflowengine
1条回答
网友
1楼 · 发布于 2024-04-25 21:46:54

这里有两件事:

1.)通用状态处理程序:可以通过state_handlerskwarg设置这些处理程序,并在每次状态更改时调用它们。状态处理程序需要有一个签名state_handler(task: Task, old_state: State, new_state: State) -> Optional[State](您正在使用的签名);调用此处理程序后,任务的状态将是从处理程序返回的状态,如果返回None,则为new_state

2.)关于失败回调:您在这里使用的on_failurekwarg旨在为状态处理程序提供一个方便的API;传递给此关键字的函数必须具有签名fn(task: Task, state: State) -> None,并且仅当此任务进入Failed状态时才会调用。请注意,失败时回调不能像状态处理程序那样改变任务的状态

在您的示例中,您似乎混合了两个关键字参数。我相信下面的代码将满足您的期望:

from prefect.engine.state import Success


def handle_disambig_error(task, old_state, new_state):
    if new_state.is_failed():
        return_state = Success(result=pd.DataFrame())
    else:
        return_state = new_state
    return return_state

@task(state_handlers=[handle_disambig_error])
def get_wiki_resource():
   return df

相关问题 更多 >