什么时候打电话合适循环.关闭()?

2024-04-20 13:56:07 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经用asyncio做了一段时间的实验,阅读了PEPs;一些教程;甚至还有{a2}。在

我想我已经掌握了窍门,但是我仍然对loop.close()的行为感到困惑,我不太清楚什么时候可以“安全”地调用它。在

简而言之,我的用例是一堆阻塞的“老派”调用,我将它们封装在run_in_executor()和一个外部协同程序中;如果这些调用中有任何一个出错,我希望停止进程,取消那些尚未完成的调用,打印一个合理的日志,然后(希望,干净地)离开。在

比如说,像这样:

import asyncio
import time


def blocking(num):
    time.sleep(num)
    if num == 2:
        raise ValueError("don't like 2")
    return num


async def my_coro(loop, num):
    try:
        result = await loop.run_in_executor(None, blocking, num)
        print(f"Coro {num} done")
        return result
    except asyncio.CancelledError:
        # Do some cleanup here.
        print(f"man, I was canceled: {num}")


def main():
    loop = asyncio.get_event_loop()
    tasks = []
    for num in range(5):
        tasks.append(loop.create_task(my_coro(loop, num)))

    try:
        # No point in waiting; if any of the tasks go wrong, I
        # just want to abandon everything. The ALL_DONE is not
        # a good solution here.
        future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
        done, pending = loop.run_until_complete(future)
        if pending:
            print(f"Still {len(pending)} tasks pending")
            # I tried putting a stop() - with/without a run_forever()
            # after the for - same exception raised.
            #  loop.stop()
            for future in pending:
                future.cancel()

        for task in done:
            res = task.result()
            print("Task returned", res)
    except ValueError as error:
        print("Outer except --", error)
    finally:
        # I also tried placing the run_forever() here,
        # before the stop() - no dice.
        loop.stop()
        if pending:
            print("Waiting for pending futures to finish...")
            loop.run_forever()
        loop.close()

我尝试了stop()run_forever()调用的几种变体,“永远先运行,然后停止”似乎是根据to the pydoc使用的方法,如果不调用close(),则会产生令人满意的结果:

^{pr2}$

但是,当添加对close()的调用时(如上所示),我得到两个异常:

exception calling callback for <Future at 0x104f21438 state=finished returned int>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 324, in _invoke_callbacks
    callback(self)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 414, in _call_set_state
    dest_loop.call_soon_threadsafe(_set_state, destination, source)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
    self._check_closed()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

这充其量是令人恼火的,但对我来说,完全令人费解:而且,更糟糕的是,我一直无法找到处理这种情况的正确方法。在

因此,有两个问题:

  • 我错过了什么?我应该如何修改上面的代码,以使包含对close()的调用不会引发?

  • 如果我不调用close()-在这个微不足道的例子中,我认为它在很大程度上是多余的;但是在“真正的”生产代码中会有什么后果呢?

为了我个人的满意,还:

  • 为什么会上涨?循环还想从coro/tasks中得到什么:它们要么退出;要么被提升;要么被取消:这不足以让它满意吗?在

非常感谢您的建议!在


Tags: theruninloopasyncioforcloseif
2条回答

Distilled to its simplest, my use case is a bunch of blocking "old school" calls, which I wrap in the run_in_executor() and an outer coroutine; if any of those calls goes wrong, I want to stop progress, cancel the ones still outstanding

这无法按预期工作,因为run_in_executor将函数提交给线程池,而操作系统线程在Python(或其他公开它们的语言)中无法取消。取消run_in_executor返回的future将尝试取消底层的{a1},但这只在阻塞函数尚未运行时生效,例如,由于线程池正忙。一旦它开始执行,就不能安全地取消它。与线程相比,支持安全可靠的取消是使用asyncio的好处之一。在

如果您正在处理同步代码,不管是旧的阻塞调用还是运行时间较长的CPU绑定代码,您应该使用run_in_executor来运行它,并采用一种方法来中断它。例如,代码偶尔会检查stop_requested标志,如果是真的,可能会通过引发异常来退出。然后,您可以通过设置适当的标志来“取消”这些任务。在

how should I modify the code above in a way that with the call to close() included does not raise?

据我所知,如果不修改blocking和顶层代码,目前无法做到这一点。run_in_executor将坚持通知事件循环结果,当事件循环关闭时,这将失败。取消asyncio-future并没有帮助,因为取消检查是在事件循环线程中执行的,而错误发生在工作线程调用call_soon_threadsafe之前。(可以将检查移动到工作线程,但应该仔细分析它是否导致对cancel()的调用与实际检查之间的竞争条件。)

why does it raise at all? what more does the loop want from the coros/tasks: they either exited; raised; or were canceled: isn't this enough to keep it happy?

它希望传递给run_in_executor(问题中的字面意思是blocking)的阻塞函数在关闭事件循环之前完成运行。您取消了asyncio-future,但是底层的concurrent-future仍然希望“呼叫总部”,发现循环已关闭。在

这是否是asyncio中的一个bug,或者在确保提交给run_in_executor的所有工作完成之前,不应该关闭事件循环。这样做需要进行以下更改:

  • 不要试图取消待定的期货。从表面上看,取消它们看起来是正确的,但它阻止了您对这些未来的wait(),因为asyncio会认为它们是完整的。在
  • 相反,向后台任务发送一个特定于应用程序的事件,通知它们需要中止。在
  • 在^{之前调用loop.run_until_complete(asyncio.wait(pending))。在

通过这些修改(除了特定于应用程序的事件-我只是让sleep()完成他们的过程),异常没有出现。在

what actually happens if I don't call close() - in this trivial case, I presume it's largely redundant; but what might the consequences be in a "real" production code?

由于典型的事件循环与应用程序的运行时间一样长,因此在程序的最后不调用close()应该没有问题。无论如何,操作系统将在程序退出时清理资源。在

调用loop.close()对于具有明确生存期的事件循环非常重要。例如,库可能会为特定任务创建一个新的事件循环,在专用线程中运行它,然后释放它。未能关闭这样的循环可能会泄漏其内部资源(例如它用于线程间唤醒的管道)并导致程序失败。另一个例子是测试套件,它通常为每个单元测试启动一个新的事件循环,以确保测试环境的分离。在


编辑:Ifiled a bug此问题。
编辑2:这个bug是由开发人员fixed造成的。

在修复upstream issue之前,解决此问题的另一种方法是使用没有缺陷的自定义版本替换{}。虽然一开始自己的run_in_executor听起来是个坏主意,但实际上它只是^{}和{a3}未来之间的一个小粘合剂。在

可以使用这两个类的公共API清晰地实现run_in_executor的简单版本:

def run_in_executor(executor, fn, *args):
    """Submit FN to EXECUTOR and return an asyncio future."""
    loop = asyncio.get_event_loop()
    if args:
        fn = functools.partial(fn, *args)
    work_future = executor.submit(fn)
    aio_future = loop.create_future()
    aio_cancelled = False

    def work_done(_f):
        if not aio_cancelled:
            loop.call_soon_threadsafe(set_result)

    def check_cancel(_f):
        nonlocal aio_cancelled
        if aio_future.cancelled():
            work_future.cancel()
            aio_cancelled = True

    def set_result():
        if work_future.cancelled():
            aio_future.cancel()
        elif work_future.exception() is not None:
            aio_future.set_exception(work_future.exception())
        else:
            aio_future.set_result(work_future.result())

    work_future.add_done_callback(work_done)
    aio_future.add_done_callback(check_cancel)

    return aio_future

loop.run_in_executor(blocking)run_in_executor(executor, blocking)替换时,executor是在main()中创建的^{},代码可以工作而不需要其他修改。在

当然,在这个变体中,同步函数将继续在另一个线程中运行直到完成,尽管被取消了,但是如果不修改它们来支持显式中断,这是不可避免的。在

相关问题 更多 >