运行_,直到_完成失败,即使循环已停止

2024-05-23 17:46:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图编写一个SIGTERM处理程序,让我的永远运行(-loop

  • 停止接受新任务
  • 完成正在运行的任务
  • 关闭

这是我写的一个学习演示:

import asyncio
import signal
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(name)s]: %(message)s', datefmt='%H:%M:%S')
_log = logging.getLogger(__name__)


class Looper:
    def __init__(self, loop):
        self._loop = loop
        self._shutdown = False
        signal.signal(signal.SIGINT, self._exit)
        signal.signal(signal.SIGTERM, self._exit)

    def _exit(self, sig, frame):
        name = signal.Signals(sig).name
        _log.info(f"Received shutdown-signal: {sig} ({name})")
        self._shutdown = True
        self._loop.stop() # << Stopping the event loop here.
        _log.info(f"Loop stop initiated.")
        pending = asyncio.all_tasks(loop=self._loop)
        _log.info(f"Collected {len(pending)} tasks that have been stopped.")
        if pending:
            _log.info("Attempting to gather pending tasks: " + str(pending))
            gatherer_set = asyncio.gather(*pending, loop=self._loop)
            # self._loop.run_until_complete(gatherer_set) # << "RuntimeError: This event loop is already running"
        _log.info("Shutting down for good.")

    async def thumper(self, id, t):
        print(f"{id}: Winding up...")
        while not self._shutdown:
            await asyncio.sleep(t)
            print(f'{id}: Thump!')
        print(f'{id}: Thud.')


loop = asyncio.get_event_loop()
lp = Looper(loop)
loop.create_task(lp.thumper('North Hall', 2))
loop.create_task(lp.thumper('South Hall', 3))
loop.run_forever()
_log.info("Done.")

在Windows10和Debian10上,上面的脚本都会对SIGINT作出反应并生成输出

North Hall: Winding up...
South Hall: Winding up...
North Hall: Thump!
South Hall: Thump!
North Hall: Thump!
South Hall: Thump!
North Hall: Thump!
09:55:53 INFO [__main__]: Received shutdown-signal: 2 (SIGINT)
09:55:53 INFO [__main__]: Loop stop initiated.
09:55:53 INFO [__main__]: Collected 2 tasks that have been stopped.
09:55:53 INFO [__main__]: Attempting to gather pending tasks: {<Task pending coro=<Looper.thumper() running at amazing_grace.py:42> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x02F91BF0>()]>>, <Task pending coro=<Looper.thumper() running at amazing_grace.py:42> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x02F91C10>()]>>}
09:55:53 INFO [__main__]: Shutting down for good.
09:55:53 INFO [__main__]: Done.

不幸的是,“砰”一行表示砰砰(…)演示调用实际上 结论是,不会出现。我猜,这是因为“聚会”给了我一套 未实现的未来。但是,如果我敢激活则运行直到完成()- 行,即使它位于self.\u loop.stop()之后,输出 结束如下:

[...]
10:24:25 INFO [__main__]: Collected 2 tasks that have been stopped.
10:24:25 INFO [__main__]: Attempting to gather pending tasks: {<Task pending coro=<Looper.thumper() running at amazing_grace.py:41> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x03E417D0>()]>>, <Task pending coro=<Looper.thumper() running at amazing_grace.py:41> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x03E41BF0>()]>>}
Traceback (most recent call last):
  File "amazing_grace.py", line 50, in <module>
    loop.run_forever()
  File "C:\Python37\lib\asyncio\base_events.py", line 539, in run_forever
    self._run_once()
  File "C:\Python37\lib\asyncio\base_events.py", line 1739, in _run_once
    event_list = self._selector.select(timeout)
  File "C:\Python37\lib\selectors.py", line 323, in select
    r, w, _ = self._select(self._readers, self._writers, [], timeout)
  File "C:\Python37\lib\selectors.py", line 314, in _select
    r, w, x = select.select(r, w, w, timeout)
  File "amazing_grace.py", line 35, in _exit
    self._loop.run_until_complete(gatherer_set) # << "This event loop is already running"
  File "C:\Python37\lib\asyncio\base_events.py", line 571, in run_until_complete
    self.run_forever()
  File "C:\Python37\lib\asyncio\base_events.py", line 526, in run_forever
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

问题归结为

  • 在这种情况下,如何调用或替换运行\u直到\u完成(..),以及
  • 为什么我看到这个“循环正在运行”-停止循环后的错误

该程序应在Python 3.7上运行,同时在Windows 10Linux下运行

几天后编辑

正如zaquest在回答中所说的那样,如果只是分配一个信号处理程序并在其中添加一个create_task调用,就会自找麻烦;正如我观察到的,这个例程可能运行,也可能不运行(即使没有其他任务)。因此,现在我添加了一个sys.platform检查脚本是否在UNIX()下运行。如果是的话,我更喜欢使用更可靠的loop.add_signal_handler来定义回调函数,这正是我真正需要的。幸运的是,UNIX是我的主要用例。主线:

self._loop.add_signal_handler(signal.signal(signal.SIGINT, self._exit, signal.SIGINT, None)

为什么要进行平台检查?:在文档https://docs.python.org/3/library/asyncio-eventloop.html#unix-signals之后,loop.add_signal_handler()在Windows上不可用,考虑到所讨论的信号是UNIX行话,这并不奇怪


Tags: runinpyselfinfoloopasynciosignal
2条回答

Python信号处理程序在main thread中执行,在循环运行的同一线程中执行BaseEventLoop.stop()方法不会立即停止循环,而只是sets a flag,这样当下次循环运行时,它只执行已经调度的回调,而不会调度更多的回调(请参见run_forever)。但是,在信号处理程序返回之前,循环无法运行。这意味着您不能等到循环在信号处理程序中停止。相反,您可以安排另一个任务,等待长时间运行的任务对self._shutdown中的更改做出反应,然后停止循环

class Looper:
    ...

    def _exit(self, sig, frame):
        name = signal.Signals(sig).name
        _log.info("Received shutdown-signal: %s (%s)", sig, name)
        self._shutdown = True

        pending = asyncio.all_tasks(loop=self._loop)
        _log.info("Attempting to gather pending tasks: " + str(pending))
        if pending:
            self._loop.create_task(self._wait_for_stop(pending))

    async def _wait_for_stop(self, tasks):
        await asyncio.gather(*tasks)
        self._loop.stop()  # << Stopping the event loop here.
        _log.info("Loop stop initiated.")

    ...

还有一件事需要提到的是,文档中说signal.signal()处理程序是not allowed与循环交互的,但没有说明原因(see

找到一个将从异步函数调用self.\u loop.stop()的解决方案 这将首先等待所有其他任务。请注意,它不会等待自己! 如果它尝试,程序将锁定

此外,asyncio.wait_for(..)co例程允许超时

import asyncio
import signal
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(name)s]: %(message)s', datefmt='%H:%M:%S')
_log = logging.getLogger(__name__)


class Looper:
    def __init__(self, loop):
        self._loop = loop
        self._shutdown = False
        signal.signal(signal.SIGINT, self._exit)
        signal.signal(signal.SIGTERM, self._exit)

    async def _a_exit(self):
        self._shutdown = True
        my_task = asyncio.current_task()
        pending = list(filter(lambda x: x is not my_task, asyncio.all_tasks(loop=self._loop)))
        waiters = [asyncio.wait_for(p, timeout = 1.5, loop=self._loop) for p in pending]
        results = await asyncio.gather(*waiters, loop=self._loop, return_exceptions=True)
        n_failure = len(list(filter(lambda x: isinstance(x, Exception), results)))
        _log.info(f"{n_failure} failed processes when quick-gathering the remaining {len(results)} tasks. Stopping loop now.")
        self._loop.stop()

    def _exit(self, sig, frame):
        name = signal.Signals(sig).name
        _log.info(f"Received shutdown-signal: {sig} ({name})")
        self._loop.create_task(self._a_exit())

    async def thumper(self, id, t):
        print(f"{id}: Winding up...")
        while not self._shutdown:
            await asyncio.sleep(t)
            print(f'{id}: Thump!')
        print(f'{id}: Thud.')


loop = asyncio.get_event_loop()
lp = Looper(loop)
loop.create_task(lp.thumper('North Hall', 1))
loop.create_task(lp.thumper('South Hall', 2))
loop.create_task(lp.thumper(' West Hall', 3))
loop.create_task(lp.thumper(' East Hall', 4))
loop.run_forever()
_log.info("Done.")

在Windows 10上,这可能导致输出

North Hall: Winding up...
South Hall: Winding up...
 West Hall: Winding up...
 East Hall: Winding up...
North Hall: Thump!
South Hall: Thump!
[..]
South Hall: Thump!
North Hall: Thump!
14:20:59 INFO [__main__]: Received shutdown-signal: 2 (SIGINT)
 West Hall: Thump!
 West Hall: Thud.
North Hall: Thump!
North Hall: Thud.
South Hall: Thump!
South Hall: Thud.
14:21:01 INFO [__main__]: 1 failed processes when quick-gathering the remaining 4 tasks. Stopping loop now.
14:21:01 INFO [__main__]: Done.

失败的进程受到超时的影响

请注意,这解决了我的问题。但是,调用loop.stop()后,为什么loop.run\u直到\u complete(..)失败的问题仍然存在

相关问题 更多 >