在Python中,如何判断一个进程是否完成?
在一个Python的图形界面程序(用的是PyGTK)中,我启动了一个进程(使用多进程)。这个进程需要很长时间才能完成,大约需要20分钟。当这个进程完成后,我想要清理一下(提取结果并结束进程)。我该怎么知道这个进程什么时候完成呢?
我的同事建议在父进程中使用一个忙等待的循环,去检查子进程是否完成。肯定有更好的方法。
在Unix系统中,当一个进程被分叉时,父进程会收到一个信号,表示子进程已经完成。但是我在Python中没有看到类似的东西。我是不是漏掉了什么?
父进程是怎么知道子进程结束的呢?(当然,我不想调用Process.join(),因为那样会让图形界面卡住。)
这个问题不仅仅涉及多进程:我在多线程中也遇到了完全一样的问题。
5 个回答
你可以使用一个叫做 队列 来和子进程进行沟通。你可以把中间结果放进去,或者放一些消息来表示进度(比如进度条)或者只是简单地告诉你这个进程准备好了,可以合并了。用 empty 来检查队列是否为空既简单又快速。
如果你只是想知道进程是否完成,你可以查看进程的 退出代码,或者用 is_alive() 来检查它是否还在运行。
我觉得为了让Python能够在不同平台上运行,一些简单的事情,比如SIGCHLD信号,得自己处理。没错,这样做会多花点时间,尤其是当你只是想知道子进程什么时候完成时,但其实并没有那么麻烦。想想下面这个例子,它使用一个子进程来完成工作,创建了两个multiprocessing.Event实例,还有一个线程来检查子进程是否完成:
import threading
from multiprocessing import Process, Event
from time import sleep
def childsPlay(event):
print "Child started"
for i in range(3):
print "Child is playing..."
sleep(1)
print "Child done"
event.set()
def checkChild(event, killEvent):
event.wait()
print "Child checked, and is done playing"
if raw_input("Do again? y/n:") == "y":
event.clear()
t = threading.Thread(target=checkChild, args=(event, killEvent))
t.start()
p = Process(target=childsPlay, args=(event,))
p.start()
else:
cleanChild()
killEvent.set()
def cleanChild():
print "Cleaning up the child..."
if __name__ == '__main__':
event = Event()
killEvent = Event()
# process to do work
p = Process(target=childsPlay, args=(event,))
p.start()
# thread to check on child process
t = threading.Thread(target=checkChild, args=(event, killEvent))
t.start()
try:
while not killEvent.is_set():
print "GUI running..."
sleep(1)
except KeyboardInterrupt:
print "Quitting..."
exit(0)
finally:
print "Main done"
编辑
把所有创建的进程和线程都连接起来是个好习惯,因为这样可以帮助你发现那些“僵尸”进程(永远不会结束的进程/线程)。我对上面的代码做了一些修改,创建了一个ChildChecker类,它继承自threading.Thread。这个类的唯一目的是在一个单独的进程中启动一个任务,等待这个进程完成,然后通知图形界面一切都完成了。连接ChildChecker也会连接它正在“检查”的进程。如果这个进程在5秒内没有结束,线程会强制终止这个进程。输入“y”会启动一个子进程,运行“endlessChildsPlay”,这个过程必须展示强制终止的效果。
import threading
from multiprocessing import Process, Event
from time import sleep
def childsPlay(event):
print "Child started"
for i in range(3):
print "Child is playing..."
sleep(1)
print "Child done"
event.set()
def endlessChildsPlay(event):
print "Endless child started"
while True:
print "Endless child is playing..."
sleep(1)
event.set()
print "Endless child done"
class ChildChecker(threading.Thread):
def __init__(self, killEvent):
super(ChildChecker, self).__init__()
self.killEvent = killEvent
self.event = Event()
self.process = Process(target=childsPlay, args=(self.event,))
def run(self):
self.process.start()
while not self.killEvent.is_set():
self.event.wait()
print "Child checked, and is done playing"
if raw_input("Do again? y/n:") == "y":
self.event.clear()
self.process = Process(target=endlessChildsPlay, args=(self.event,))
self.process.start()
else:
self.cleanChild()
self.killEvent.set()
def join(self):
print "Joining child process"
# Timeout on 5 seconds
self.process.join(5)
if self.process.is_alive():
print "Child did not join! Killing.."
self.process.terminate()
print "Joining ChildChecker thread"
super(ChildChecker, self).join()
def cleanChild(self):
print "Cleaning up the child..."
if __name__ == '__main__':
killEvent = Event()
# thread to check on child process
t = ChildChecker(killEvent)
t.start()
try:
while not killEvent.is_set():
print "GUI running..."
sleep(1)
except KeyboardInterrupt:
print "Quitting..."
exit(0)
finally:
t.join()
print "Main done"
这个答案其实很简单!(我花了几天才搞明白。)
结合PyGTK的idle_add(),你可以创建一个自动加入线程(AutoJoiningThread)。总的代码非常简单:
class AutoJoiningThread(threading.Thread):
def run(self):
threading.Thread.run(self)
gobject.idle_add(self.join)
如果你想做的不仅仅是加入线程(比如收集结果),那么你可以扩展上面的类,让它在完成时发出信号,就像下面的例子那样:
import threading
import time
import sys
import gobject
gobject.threads_init()
class Child:
def __init__(self):
self.result = None
def play(self, count):
print "Child starting to play."
for i in range(count):
print "Child playing."
time.sleep(1)
print "Child finished playing."
self.result = 42
def get_result(self, obj):
print "The result was "+str(self.result)
class AutoJoiningThread(threading.Thread, gobject.GObject):
__gsignals__ = {
'finished': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
())
}
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
gobject.GObject.__init__(self)
def run(self):
threading.Thread.run(self)
gobject.idle_add(self.join)
gobject.idle_add(self.emit, 'finished')
def join(self):
threading.Thread.join(self)
print "Called Thread.join()"
if __name__ == '__main__':
print "Creating child"
child = Child()
print "Creating thread"
thread = AutoJoiningThread(target=child.play,
args=(3,))
thread.connect('finished', child.get_result)
print "Starting thread"
thread.start()
print "Running mainloop (Ctrl+C to exit)"
mainloop = gobject.MainLoop()
try:
mainloop.run()
except KeyboardInterrupt:
print "Received KeyboardInterrupt. Quiting."
sys.exit()
print "God knows how we got here. Quiting."
sys.exit()
上面例子的输出会根据线程执行的顺序而有所不同,但大致会像这样:
Creating child Creating thread Starting thread Child starting to play. Child playing. Running mainloop (Ctrl+C to exit) Child playing. Child playing. Child finished playing. Called Thread.join() The result was 42 ^CReceived KeyboardInterrupt. Quiting.
不过,不能用同样的方法创建一个自动加入进程(AutoJoiningProcess),因为我们不能在两个不同的进程之间调用idle_add()。不过,我们可以用自动加入线程来实现我们想要的效果:
class AutoJoiningProcess(multiprocessing.Process):
def start(self):
thread = AutoJoiningThread(target=self.start_process)
thread.start() # automatically joins
def start_process(self):
multiprocessing.Process.start(self)
self.join()
为了演示自动加入进程,这里有另一个例子:
import threading
import multiprocessing
import time
import sys
import gobject
gobject.threads_init()
class Child:
def __init__(self):
self.result = multiprocessing.Manager().list()
def play(self, count):
print "Child starting to play."
for i in range(count):
print "Child playing."
time.sleep(1)
print "Child finished playing."
self.result.append(42)
def get_result(self, obj):
print "The result was "+str(self.result)
class AutoJoiningThread(threading.Thread, gobject.GObject):
__gsignals__ = {
'finished': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
())
}
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
gobject.GObject.__init__(self)
def run(self):
threading.Thread.run(self)
gobject.idle_add(self.join)
gobject.idle_add(self.emit, 'finished')
def join(self):
threading.Thread.join(self)
print "Called Thread.join()"
class AutoJoiningProcess(multiprocessing.Process, gobject.GObject):
__gsignals__ = {
'finished': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
())
}
def __init__(self, *args, **kwargs):
multiprocessing.Process.__init__(self, *args, **kwargs)
gobject.GObject.__init__(self)
def start(self):
thread = AutoJoiningThread(target=self.start_process)
thread.start()
def start_process(self):
multiprocessing.Process.start(self)
self.join()
gobject.idle_add(self.emit, 'finished')
def join(self):
multiprocessing.Process.join(self)
print "Called Process.join()"
if __name__ == '__main__':
print "Creating child"
child = Child()
print "Creating thread"
process = AutoJoiningProcess(target=child.play,
args=(3,))
process.connect('finished',child.get_result)
print "Starting thread"
process.start()
print "Running mainloop (Ctrl+C to exit)"
mainloop = gobject.MainLoop()
try:
mainloop.run()
except KeyboardInterrupt:
print "Received KeyboardInterrupt. Quiting."
sys.exit()
print "God knows how we got here. Quiting."
sys.exit()
最终的输出会和上面的例子非常相似,只不过这次我们有进程加入和它的辅助线程也在加入:
Creating child Creating thread Starting thread Running mainloop (Ctrl+C to exit) Child starting to play. Child playing. Child playing. Child playing. Child finished playing. Called Process.join() The result was [42] Called Thread.join() ^CReceived KeyboardInterrupt. Quiting.
不幸的是:
- 这个解决方案依赖于gobject,因为使用了idle_add()。gobject是PyGTK使用的。
- 这并不是一个真正的父子关系。如果一个线程是由另一个线程启动的,那么它仍然会被运行主循环的线程加入,而不是父线程。这个问题在自动加入进程中也是一样,只不过我想那时候会抛出一个异常。
因此,使用这种方法时,最好只在主循环/图形界面中创建线程或进程。