multiprocessing.Event.wait在信号干扰时挂起

5 投票
5 回答
3536 浏览
提问于 2025-04-18 11:07

我使用以下代码来处理SIGINT事件。这个代码设置了一个多进程事件,用来“唤醒”正在等待的主线程。

import multiprocessing
import signal

class Class1(object):
    _stop_event = multiprocessing.Event()

    @staticmethod
    def interrupt():
        Class1._stop_event.set()

    def wait(self):
        print("Waiting for _stop_event")
        if Class1._stop_event.wait(5):
            print("_stop_event set.")
        else:
            print("Timed out.")

def stop(signum, frame):
    print("Received SIG")
    Class1.interrupt()

signal.signal(signal.SIGINT, stop)

c = Class1()
c.wait()

如果没有任何信号,wait方法会在10秒后超时,然后进程会正常退出,输出如下:

Waiting for _stop_event
Timed out.

当发送SIGINT信号时,信号被处理了,但event.wait方法既没有立即返回,也没有在超时后返回。进程永远不会退出。输出是:

Waiting for _stop_event
^CReceived SIG

我可以继续发送SIGINT信号。进程不会退出,输出是:

Waiting for _stop_event
^CReceived SIG
^CReceived SIG
^CReceived SIG
^CReceived SIG
....

如果我把Class1.wait方法换成检查event.is_set,所有事情就都正常了:

def wait(self):
    print("Waiting for _stop_event")
    while True:
        if Class1._stop_event.is_set():
            print("_stop_event set.")
            break

进程会退出,输出是:

Waiting for _stop_event
^CReceived SIG
_stop_event set.

怎么才能让event.wait在事件被设置后返回呢?为什么wait方法连超时都不再发生了?

5 个回答

0

正如我在问题 #85772 中提到的:

我花了很多时间在这个问题上。我有一个叫做 multiprocessing.Event 的东西,它会让子进程的主线程休眠,还有一个中断处理程序会设置这个事件。根据这里的描述,当中断触发事件时,进程就会卡住。经过一些网上的研究,包括查看提到的 StackOverflow 话题,我找到了以下解决方案:

对于使用 fork 启动方法的 Linux:

  1. 在中断处理程序中,调用 threading.Thread(target=exit_event.set).start()
  2. 把 multiprocessing.Event 改成 threading.Event

对于使用 spawn 启动方法的 Linux:

  1. 在中断处理程序中,调用 threading.Thread(target=exit_event.set).start()

对于 Windows:

  1. 与其使用 signal.signal() 来改变中断处理程序,不如使用:

    import win32api

    win32api.SetConsoleCtrlHandler(handler, add)

参考链接: https://learn.microsoft.com/en-us/windows/console/setconsolectrlhandler

这个方法效果很好。关键的不同之处在于它在一个名为 Dummy-1 的线程上运行中断函数。

  1. (这不是一个很好的解决方案) 在中断处理程序中,调用 threading.Thread(target=exit_event.set).start()
    • 这个方法解决了卡住的问题,但会引发一个 InterruptedError 异常
0

在Alex的出色研究基础上,这意味着从另一个线程设置标志不会导致死锁。如果把中断方法改成下面这样,原来的代码就能正常工作:

from threading import Thread

(...)
    @staticmethod
    def interrupt():
        Thread(target=Class1._stop_event.set).start()
2

你也可以用 pause() 这个函数,它在 signal 模块里,来代替 Event().wait()signal.pause() 会让程序暂停,直到收到一个信号。在这个情况下,当收到 SIGINT 信号时,signal.pause() 就会结束,并且不返回任何东西。需要注意的是,根据文档,这个函数在 Windows 系统上是不能用的。我在 Linux 上试过,效果很好。

我是在这个 SO 讨论串 中发现这个解决方案的。

7

信号只能在主线程中处理。如果主线程在执行系统调用时被阻塞,那么这个系统调用就会引发一个InterruptedError(中断错误)。

根据Python文档的说法:

信号只能在Python解释器的“原子”指令之间发生。

比如说,time.sleep这个方法会引发一个InterruptedError。看起来event.wait方法在这种情况下处理得不太对,它不会引发InterruptedError,而是直接卡住。这在我看来像是Python的一个bug?


更新:

我发现这是在multiprocessing.Event中出现的死锁。如果主线程在等待一个事件,同时一个信号在被中断的主线程上设置了这个事件,那么multiprocessing.event.set()和multiprocessing.event.wait()这两个方法就会互相卡住。

而且,这种行为在不同平台上差别很大。例如,在Windows上,time.sleep()会引发一个InterruptedError,但在Linux上则会直接返回。


一个很笨重的解决办法是让主线程保持空闲,以便处理信号。

import multiprocessing
import signal
import threading
import time


class Class1(object):
    _stop_event = multiprocessing.Event()

    @staticmethod
    def interrupt():
        Class1._stop_event.set()

    def wait_timeout(self):
        print("Waiting for _stop_event")
        if Class1._stop_event.wait(30):
            print("_stop_event set.")
        else:
            print("Timeout")


def stop(signum, frame):
    print("Received SIG")
    Class1.interrupt()
    exit_event.set()


def run():
    c = Class1()
    c.wait_timeout()

t = threading.Thread(target=run)
t.daemon = False
t.start()

exit_event = multiprocessing.Event()
signal.signal(signal.SIGINT, stop)

while not exit_event.is_set():
    # Keep a main thread around to handle the signal and
    # that thread must not be in a event.wait()!
    try:
        time.sleep(500)
    except InterruptedError:
        # We were interrupted by the incoming signal.
        # Let the signal handler stop the process gracefully.
        pass

这实在是太难看了。希望有人能提供一个更优雅的解决方案……

4

你们一定会喜欢这个。使用 threading.Event,而不是 multiprocessing.Event。这样当你按下 ^C 时,信号处理器就会像应该的那样被调用!

来源

import threading
import signal

class Class1(object):
    _stop_event = threading.Event()

    @staticmethod
    def interrupt():
        Class1._stop_event.set()

    def wait(self):
        print("Waiting for _stop_event")
        if Class1._stop_event.wait(5):
            print("_stop_event set.")
        else:
            print("Timed out.")

def stop(signum, frame):
    print("Received SIG")
    Class1.interrupt()

signal.signal(signal.SIGINT, stop)

c = Class1()
c.wait()

输出

Waiting for _stop_event
^CReceived SIG
_stop_event set.

撰写回答