失败时重新运行函数
reloadable的Python项目详细描述
可重载
函数my_func将无限期运行,直到停止引发异常为止, 在这种情况下是不会发生的。
fromreloadableimportreloadable@reloadable()defmy_func():raiseException('Oops')
当我们想永远运行某个程序(如代码)时,此模块非常有用 连接到队列的获取消息。最终它可能会断开 尝试获取消息时引发错误,以便可重新加载可以重试连接。
@reloadable()defget_message():conn=Queue(host='...',password='...')whileTrue:message=conn.fetch_message()# probably process message afterwards...
您可以配置接收异常的回调函数,该异常将 如果发生则调用。
defshit_happens(exception):logger.exception(exception)@reloadable(exception_callback=shit_happens)defdont_stop():raiseException('Deal with it')
您也可以在下次重生之前等待一段时间
@reloadable(sleep_time=7)# wait 7 seconds before running `get_message` after a failuredefget_message():# some code...
始终可以使用KeyboardInterrupt异常停止可重新加载 (通常由^C触发,但不一定)。
另一个选项是配置停止条件异常。
@reloadable(stop_condition_exception=ValueError)defi_will_stop():raiseValueError('something went wrong')
或者您可以全局定义它,如果未定义本地停止条件,则将使用它
fromreloadableimportreloadable,configureconfigure(stop_condition_exception=KeyError)@reloadable()defi_will_stop():raiseKeyError('...')
您可能还希望限制装饰器应该尝试的次数 重新运行函数。如果函数在没有 成功,它会引发最后一个错误。
fromreloadableimportreloadable@reloadable(max_reloads=2)defa_func():raiseKeyError('...')
或者,您可以通过配置禁用可重新加载的decorator, 这在单元测试中很有用。
fromreloadableimportconfigure,reloadableconfigure(enabled=False)@reloadable()# When disabled, it does nothingdefi_am_free():return'\o/'
出现错误时重试
如果要在上重试某个操作,@retry_on_errordecorator非常有用 错误,但在修饰函数完成后返回结果 执行成功。
importrequestsfromreloadable.decoratorsimportretry_on_error@retry_on_error(max_reloads=3)defmy_request():response=requests.get("https://www.sieve.com.br")# raises an error for 4xx and 5xx status codesresponse.raise_for_status()returnresponse.content
测试
python -m unittest -v tests
安装
pip install reloadable