multiprocessing.Event.wait在信号干扰时挂起
我使用以下代码来处理SIGINT事件。这个代码设置了一个多进程事件,用来“唤醒”正在等待的主线程。
import multiprocessing
import signal
class Class1(object):
_stop_event = multiprocessing.Event()
@staticmethod
def interrupt():
Class1._stop_event.set()
def wait(self):
print("Waiting for _stop_event")
if Class1._stop_event.wait(5):
print("_stop_event set.")
else:
print("Timed out.")
def stop(signum, frame):
print("Received SIG")
Class1.interrupt()
signal.signal(signal.SIGINT, stop)
c = Class1()
c.wait()
如果没有任何信号,wait方法会在10秒后超时,然后进程会正常退出,输出如下:
Waiting for _stop_event
Timed out.
当发送SIGINT信号时,信号被处理了,但event.wait方法既没有立即返回,也没有在超时后返回。进程永远不会退出。输出是:
Waiting for _stop_event
^CReceived SIG
我可以继续发送SIGINT信号。进程不会退出,输出是:
Waiting for _stop_event
^CReceived SIG
^CReceived SIG
^CReceived SIG
^CReceived SIG
....
如果我把Class1.wait方法换成检查event.is_set,所有事情就都正常了:
def wait(self):
print("Waiting for _stop_event")
while True:
if Class1._stop_event.is_set():
print("_stop_event set.")
break
进程会退出,输出是:
Waiting for _stop_event
^CReceived SIG
_stop_event set.
怎么才能让event.wait在事件被设置后返回呢?为什么wait方法连超时都不再发生了?
5 个回答
正如我在问题 #85772 中提到的:
我花了很多时间在这个问题上。我有一个叫做 multiprocessing.Event 的东西,它会让子进程的主线程休眠,还有一个中断处理程序会设置这个事件。根据这里的描述,当中断触发事件时,进程就会卡住。经过一些网上的研究,包括查看提到的 StackOverflow 话题,我找到了以下解决方案:
对于使用 fork 启动方法的 Linux:
- 在中断处理程序中,调用
threading.Thread(target=exit_event.set).start()
- 把 multiprocessing.Event 改成 threading.Event
对于使用 spawn 启动方法的 Linux:
- 在中断处理程序中,调用
threading.Thread(target=exit_event.set).start()
对于 Windows:
与其使用 signal.signal() 来改变中断处理程序,不如使用:
import win32api
win32api.SetConsoleCtrlHandler(handler, add)
参考链接: https://learn.microsoft.com/en-us/windows/console/setconsolectrlhandler
这个方法效果很好。关键的不同之处在于它在一个名为 Dummy-1 的线程上运行中断函数。
- (这不是一个很好的解决方案) 在中断处理程序中,调用
threading.Thread(target=exit_event.set).start()
- 这个方法解决了卡住的问题,但会引发一个 InterruptedError 异常
在Alex的出色研究基础上,这意味着从另一个线程设置标志不会导致死锁。如果把中断方法改成下面这样,原来的代码就能正常工作:
from threading import Thread
(...)
@staticmethod
def interrupt():
Thread(target=Class1._stop_event.set).start()
信号只能在主线程中处理。如果主线程在执行系统调用时被阻塞,那么这个系统调用就会引发一个InterruptedError(中断错误)。
根据Python文档的说法:
信号只能在Python解释器的“原子”指令之间发生。
比如说,time.sleep这个方法会引发一个InterruptedError。看起来event.wait方法在这种情况下处理得不太对,它不会引发InterruptedError,而是直接卡住。这在我看来像是Python的一个bug?
更新:
我发现这是在multiprocessing.Event中出现的死锁。如果主线程在等待一个事件,同时一个信号在被中断的主线程上设置了这个事件,那么multiprocessing.event.set()和multiprocessing.event.wait()这两个方法就会互相卡住。
而且,这种行为在不同平台上差别很大。例如,在Windows上,time.sleep()会引发一个InterruptedError,但在Linux上则会直接返回。
一个很笨重的解决办法是让主线程保持空闲,以便处理信号。
import multiprocessing
import signal
import threading
import time
class Class1(object):
_stop_event = multiprocessing.Event()
@staticmethod
def interrupt():
Class1._stop_event.set()
def wait_timeout(self):
print("Waiting for _stop_event")
if Class1._stop_event.wait(30):
print("_stop_event set.")
else:
print("Timeout")
def stop(signum, frame):
print("Received SIG")
Class1.interrupt()
exit_event.set()
def run():
c = Class1()
c.wait_timeout()
t = threading.Thread(target=run)
t.daemon = False
t.start()
exit_event = multiprocessing.Event()
signal.signal(signal.SIGINT, stop)
while not exit_event.is_set():
# Keep a main thread around to handle the signal and
# that thread must not be in a event.wait()!
try:
time.sleep(500)
except InterruptedError:
# We were interrupted by the incoming signal.
# Let the signal handler stop the process gracefully.
pass
这实在是太难看了。希望有人能提供一个更优雅的解决方案……
你们一定会喜欢这个。使用 threading.Event
,而不是 multiprocessing.Event
。这样当你按下 ^C
时,信号处理器就会像应该的那样被调用!
来源
import threading
import signal
class Class1(object):
_stop_event = threading.Event()
@staticmethod
def interrupt():
Class1._stop_event.set()
def wait(self):
print("Waiting for _stop_event")
if Class1._stop_event.wait(5):
print("_stop_event set.")
else:
print("Timed out.")
def stop(signum, frame):
print("Received SIG")
Class1.interrupt()
signal.signal(signal.SIGINT, stop)
c = Class1()
c.wait()
输出
Waiting for _stop_event
^CReceived SIG
_stop_event set.