多线程Python中的信号处理
这应该是个很简单的问题,我很惊讶在StackOverflow上找不到相关的答案。
我有一个像守护进程一样的程序,需要对SIGTERM和SIGINT信号做出响应,这样才能和upstart配合得好。我了解到,最好的做法是把程序的主循环放在一个单独的线程里,让主线程来处理信号。然后,当接收到信号时,信号处理器应该通过设置一个标志位来告诉主循环退出,这个标志位会在主循环中定期检查。
我试过这样做,但结果并没有如我所预期的那样。请看下面的代码:
from threading import Thread
import signal
import time
import sys
stop_requested = False
def sig_handler(signum, frame):
sys.stdout.write("handling signal: %s\n" % signum)
sys.stdout.flush()
global stop_requested
stop_requested = True
def run():
sys.stdout.write("run started\n")
sys.stdout.flush()
while not stop_requested:
time.sleep(2)
sys.stdout.write("run exited\n")
sys.stdout.flush()
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGINT, sig_handler)
t = Thread(target=run)
t.start()
t.join()
sys.stdout.write("join completed\n")
sys.stdout.flush()
我用以下两种方式进行了测试:
1)
$ python main.py > output.txt&
[2] 3204
$ kill -15 3204
2)
$ python main.py
ctrl+c
在这两种情况下,我都希望输出中能看到以下内容:
run started
handling signal: 15
run exited
join completed
在第一种情况下,程序退出了,但我看到的只有:
run started
在第二种情况下,当按下ctrl+c时,SIGTERM信号似乎被忽略,程序没有退出。
我这里漏掉了什么呢?
3 个回答
我在这里遇到了同样的问题,具体可以看这个链接:多个线程连接时信号未处理。在阅读了abarnert的回答后,我把代码改成了Python 3,问题就解决了。不过,我其实想把我所有的程序都改成Python 3。所以,我通过避免在发送信号之前调用线程的join()方法来解决了我的程序。下面是我的代码。
虽然这个方法不是特别好,但在Python 2.7中解决了我的问题。我的问题被标记为重复,所以我把我的解决方案放在这里。
import threading, signal, time, os
RUNNING = True
threads = []
def monitoring(tid, itemId=None, threshold=None):
global RUNNING
while(RUNNING):
print "PID=", os.getpid(), ";id=", tid
time.sleep(2)
print "Thread stopped:", tid
def handler(signum, frame):
print "Signal is received:" + str(signum)
global RUNNING
RUNNING=False
#global threads
if __name__ == '__main__':
signal.signal(signal.SIGUSR1, handler)
signal.signal(signal.SIGUSR2, handler)
signal.signal(signal.SIGALRM, handler)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGQUIT, handler)
print "Starting all threads..."
thread1 = threading.Thread(target=monitoring, args=(1,), kwargs={'itemId':'1', 'threshold':60})
thread1.start()
threads.append(thread1)
thread2 = threading.Thread(target=monitoring, args=(2,), kwargs={'itemId':'2', 'threshold':60})
thread2.start()
threads.append(thread2)
while(RUNNING):
print "Main program is sleeping."
time.sleep(30)
for thread in threads:
thread.join()
print "All threads stopped."
abarnert的回答非常准确。不过我现在还在用Python 2.7。为了自己解决这个问题,我写了一个叫做InterruptableThread的类。
目前这个类不支持给线程目标传递额外的参数。而且它的Join方法也不接受超时参数。这只是因为我自己不需要这些功能。如果你需要的话,可以自己添加。使用这个类的时候,你可能想把输出语句去掉,因为它们只是用来注释和测试的。
import threading
import signal
import sys
class InvalidOperationException(Exception):
pass
# noinspection PyClassHasNoInit
class GlobalInterruptableThreadHandler:
threads = []
initialized = False
@staticmethod
def initialize():
signal.signal(signal.SIGTERM, GlobalInterruptableThreadHandler.sig_handler)
signal.signal(signal.SIGINT, GlobalInterruptableThreadHandler.sig_handler)
GlobalInterruptableThreadHandler.initialized = True
@staticmethod
def add_thread(thread):
if threading.current_thread().name != 'MainThread':
raise InvalidOperationException("InterruptableThread objects may only be started from the Main thread.")
if not GlobalInterruptableThreadHandler.initialized:
GlobalInterruptableThreadHandler.initialize()
GlobalInterruptableThreadHandler.threads.append(thread)
@staticmethod
def sig_handler(signum, frame):
sys.stdout.write("handling signal: %s\n" % signum)
sys.stdout.flush()
for thread in GlobalInterruptableThreadHandler.threads:
thread.stop()
GlobalInterruptableThreadHandler.threads = []
class InterruptableThread:
def __init__(self, target=None):
self.stop_requested = threading.Event()
self.t = threading.Thread(target=target, args=[self]) if target else threading.Thread(target=self.run)
def run(self):
pass
def start(self):
GlobalInterruptableThreadHandler.add_thread(self)
self.t.start()
def stop(self):
self.stop_requested.set()
def is_stop_requested(self):
return self.stop_requested.is_set()
def join(self):
try:
while self.t.is_alive():
self.t.join(timeout=1)
except (KeyboardInterrupt, SystemExit):
self.stop_requested.set()
self.t.join()
sys.stdout.write("join completed\n")
sys.stdout.flush()
这个类可以有两种用法。你可以继承InterruptableThread:
import time
import sys
from interruptable_thread import InterruptableThread
class Foo(InterruptableThread):
def __init__(self):
InterruptableThread.__init__(self)
def run(self):
sys.stdout.write("run started\n")
sys.stdout.flush()
while not self.is_stop_requested():
time.sleep(2)
sys.stdout.write("run exited\n")
sys.stdout.flush()
sys.stdout.write("all exited\n")
sys.stdout.flush()
foo = Foo()
foo2 = Foo()
foo.start()
foo2.start()
foo.join()
foo2.join()
或者你也可以像使用threading.thread那样使用它。不过,run方法必须把InterruptableThread对象作为参数。
import time
import sys
from interruptable_thread import InterruptableThread
def run(t):
sys.stdout.write("run started\n")
sys.stdout.flush()
while not t.is_stop_requested():
time.sleep(2)
sys.stdout.write("run exited\n")
sys.stdout.flush()
t1 = InterruptableThread(run)
t2 = InterruptableThread(run)
t1.start()
t2.start()
t1.join()
t2.join()
sys.stdout.write("all exited\n")
sys.stdout.flush()
你可以随意使用它。
问题在于,正如在Python信号处理器的执行中所解释的:
Python的信号处理器并不会在底层的(C语言)信号处理器内部执行。相反,底层信号处理器会设置一个标志,告诉虚拟机在稍后的某个时刻执行相应的Python信号处理器(比如在下一个字节码指令时)。
…
一个纯C实现的长时间运行的计算(比如在大量文本上进行正则表达式匹配)可能会不间断地运行很长时间,而不管收到任何信号。Python的信号处理器会在计算完成后被调用。
你的主线程在threading.Thread.join
上被阻塞,这最终意味着它在C语言的pthread_join
调用上被阻塞。当然,这不是一个“长时间运行的计算”,而是一个系统调用的阻塞……但无论如何,在这个调用完成之前,你的信号处理器是无法运行的。
而且,在某些平台上,pthread_join
在收到信号时会失败并返回EINTR
,而在其他平台上则不会。在Linux上,我认为这取决于你选择BSD风格还是默认的siginterrupt
行为,但默认情况下是不会的。
那么,你能做些什么呢?
好吧,我很确定Python 3.3中信号处理的变化实际上改变了Linux上的默认行为,所以如果你升级到3.3及以上版本,就不需要做任何事情;只需在3.3+版本下运行,你的代码就会按预期工作。至少在我使用的OS X上的CPython 3.4和Linux上的3.3中是这样的。(如果我说错了,不确定这是否是CPython的一个bug,所以你可能想在python-list上提出来,而不是直接开一个问题……)
另一方面,在3.3之前,signal
模块确实没有提供你需要的工具来自己解决这个问题。所以,如果你不能升级到3.3,解决方案就是等待某个可中断的东西,比如Condition
或Event
。子线程在退出前通知事件,主线程在加入子线程之前等待这个事件。这确实有点黑科技。而且我找不到任何东西能保证这样做会有区别;这只是恰好在我使用的各种CPython 2.7和3.2的OS X以及2.6和2.7的Linux上有效……