在Python中,如何判断一个进程是否完成?

30 投票
5 回答
35107 浏览
提问于 2025-04-16 11:48

在一个Python的图形界面程序(用的是PyGTK)中,我启动了一个进程(使用多进程)。这个进程需要很长时间才能完成,大约需要20分钟。当这个进程完成后,我想要清理一下(提取结果并结束进程)。我该怎么知道这个进程什么时候完成呢?

我的同事建议在父进程中使用一个忙等待的循环,去检查子进程是否完成。肯定有更好的方法。

在Unix系统中,当一个进程被分叉时,父进程会收到一个信号,表示子进程已经完成。但是我在Python中没有看到类似的东西。我是不是漏掉了什么?

父进程是怎么知道子进程结束的呢?(当然,我不想调用Process.join(),因为那样会让图形界面卡住。)

这个问题不仅仅涉及多进程:我在多线程中也遇到了完全一样的问题。

5 个回答

2

你可以使用一个叫做 队列 来和子进程进行沟通。你可以把中间结果放进去,或者放一些消息来表示进度(比如进度条)或者只是简单地告诉你这个进程准备好了,可以合并了。用 empty 来检查队列是否为空既简单又快速。

如果你只是想知道进程是否完成,你可以查看进程的 退出代码,或者用 is_alive() 来检查它是否还在运行。

12

我觉得为了让Python能够在不同平台上运行,一些简单的事情,比如SIGCHLD信号,得自己处理。没错,这样做会多花点时间,尤其是当你只是想知道子进程什么时候完成时,但其实并没有那么麻烦。想想下面这个例子,它使用一个子进程来完成工作,创建了两个multiprocessing.Event实例,还有一个线程来检查子进程是否完成:

import threading
from multiprocessing import Process, Event
from time import sleep

def childsPlay(event):
    print "Child started"
    for i in range(3):
        print "Child is playing..."
        sleep(1)
    print "Child done"
    event.set()

def checkChild(event, killEvent):
    event.wait()
    print "Child checked, and is done playing"
    if raw_input("Do again? y/n:") == "y":
        event.clear()
        t = threading.Thread(target=checkChild, args=(event, killEvent))
        t.start()
        p = Process(target=childsPlay, args=(event,))
        p.start()
    else:
        cleanChild()
        killEvent.set()

def cleanChild():
    print "Cleaning up the child..."

if __name__ == '__main__':
    event = Event()
    killEvent = Event()

    # process to do work
    p = Process(target=childsPlay, args=(event,))
    p.start()

    # thread to check on child process
    t = threading.Thread(target=checkChild, args=(event, killEvent))
    t.start()

    try:
        while not killEvent.is_set():
            print "GUI running..."
            sleep(1)
    except KeyboardInterrupt:
        print "Quitting..."
        exit(0)
    finally:
        print "Main done"

编辑

把所有创建的进程和线程都连接起来是个好习惯,因为这样可以帮助你发现那些“僵尸”进程(永远不会结束的进程/线程)。我对上面的代码做了一些修改,创建了一个ChildChecker类,它继承自threading.Thread。这个类的唯一目的是在一个单独的进程中启动一个任务,等待这个进程完成,然后通知图形界面一切都完成了。连接ChildChecker也会连接它正在“检查”的进程。如果这个进程在5秒内没有结束,线程会强制终止这个进程。输入“y”会启动一个子进程,运行“endlessChildsPlay”,这个过程必须展示强制终止的效果。

import threading
from multiprocessing import Process, Event
from time import sleep

def childsPlay(event):
    print "Child started"
    for i in range(3):
        print "Child is playing..."
        sleep(1)
    print "Child done"
    event.set()

def endlessChildsPlay(event):
    print "Endless child started"
    while True:
        print "Endless child is playing..."
        sleep(1)
        event.set()
    print "Endless child done"

class ChildChecker(threading.Thread):
    def __init__(self, killEvent):
        super(ChildChecker, self).__init__()
        self.killEvent = killEvent
        self.event = Event()
        self.process = Process(target=childsPlay, args=(self.event,))

    def run(self):
        self.process.start()

        while not self.killEvent.is_set():
            self.event.wait()
            print "Child checked, and is done playing"
            if raw_input("Do again? y/n:") == "y":
                self.event.clear()
                self.process = Process(target=endlessChildsPlay, args=(self.event,))
                self.process.start()
            else:
                self.cleanChild()
                self.killEvent.set()

    def join(self):
        print "Joining child process"
        # Timeout on 5 seconds
        self.process.join(5)

        if self.process.is_alive():
            print "Child did not join!  Killing.."
            self.process.terminate()
        print "Joining ChildChecker thread"
        super(ChildChecker, self).join()


    def cleanChild(self):
        print "Cleaning up the child..."

if __name__ == '__main__':
    killEvent = Event()
    # thread to check on child process
    t = ChildChecker(killEvent)
    t.start()

    try:
        while not killEvent.is_set():
            print "GUI running..."
            sleep(1)
    except KeyboardInterrupt:
        print "Quitting..."
        exit(0)
    finally:
        t.join()
        print "Main done"
5

这个答案其实很简单!(我花了几天才搞明白。)

结合PyGTK的idle_add(),你可以创建一个自动加入线程(AutoJoiningThread)。总的代码非常简单:

class AutoJoiningThread(threading.Thread):
    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)

如果你想做的不仅仅是加入线程(比如收集结果),那么你可以扩展上面的类,让它在完成时发出信号,就像下面的例子那样:

import threading
import time
import sys
import gobject
gobject.threads_init()

class Child:
    def __init__(self):
        self.result = None

    def play(self, count):
        print "Child starting to play."
        for i in range(count):
            print "Child playing."
            time.sleep(1)
        print "Child finished playing."
        self.result = 42

    def get_result(self, obj):
        print "The result was "+str(self.result)

class AutoJoiningThread(threading.Thread, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
        }

    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        threading.Thread.join(self)
        print "Called Thread.join()"

if __name__ == '__main__':
    print "Creating child"
    child = Child()
    print "Creating thread"
    thread = AutoJoiningThread(target=child.play,
                               args=(3,))
    thread.connect('finished', child.get_result)
    print "Starting thread"
    thread.start()
    print "Running mainloop (Ctrl+C to exit)"
    mainloop = gobject.MainLoop()

    try:
        mainloop.run()
    except KeyboardInterrupt:
        print "Received KeyboardInterrupt.  Quiting."
        sys.exit()

    print "God knows how we got here.  Quiting."
    sys.exit()

上面例子的输出会根据线程执行的顺序而有所不同,但大致会像这样:

Creating child
Creating thread
Starting thread
Child starting to play.
 Child playing.
Running mainloop (Ctrl+C to exit)
Child playing.
Child playing.
Child finished playing.
Called Thread.join()
The result was 42
^CReceived KeyboardInterrupt.  Quiting.

不过,不能用同样的方法创建一个自动加入进程(AutoJoiningProcess),因为我们不能在两个不同的进程之间调用idle_add()。不过,我们可以用自动加入线程来实现我们想要的效果:

class AutoJoiningProcess(multiprocessing.Process):
    def start(self):
        thread = AutoJoiningThread(target=self.start_process)
        thread.start() # automatically joins

    def start_process(self):
        multiprocessing.Process.start(self)
        self.join()

为了演示自动加入进程,这里有另一个例子:

import threading
import multiprocessing
import time
import sys
import gobject
gobject.threads_init()

class Child:
    def __init__(self):
        self.result = multiprocessing.Manager().list()

    def play(self, count):
        print "Child starting to play."
        for i in range(count):
            print "Child playing."
            time.sleep(1)
    print "Child finished playing."
        self.result.append(42)

    def get_result(self, obj):
        print "The result was "+str(self.result)

class AutoJoiningThread(threading.Thread, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
    }

    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        threading.Thread.join(self)
        print "Called Thread.join()"

class AutoJoiningProcess(multiprocessing.Process, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
        }

    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def start(self):
        thread = AutoJoiningThread(target=self.start_process)
        thread.start()

    def start_process(self):
        multiprocessing.Process.start(self)
        self.join()
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        multiprocessing.Process.join(self)
        print "Called Process.join()"

if __name__ == '__main__':
    print "Creating child"
    child = Child()
    print "Creating thread"
    process = AutoJoiningProcess(target=child.play,
                               args=(3,))
    process.connect('finished',child.get_result)
    print "Starting thread"
    process.start()
    print "Running mainloop (Ctrl+C to exit)"
    mainloop = gobject.MainLoop()

    try:
        mainloop.run()
    except KeyboardInterrupt:
        print "Received KeyboardInterrupt.  Quiting."
        sys.exit()

    print "God knows how we got here.  Quiting."
    sys.exit()

最终的输出会和上面的例子非常相似,只不过这次我们有进程加入和它的辅助线程也在加入:

Creating child
Creating thread
Starting thread
Running mainloop (Ctrl+C to exit)
 Child starting to play.
Child playing.
Child playing.
Child playing.
Child finished playing.
Called Process.join()
The result was [42]
Called Thread.join()
^CReceived KeyboardInterrupt.  Quiting.

不幸的是:

  1. 这个解决方案依赖于gobject,因为使用了idle_add()。gobject是PyGTK使用的。
  2. 这并不是一个真正的父子关系。如果一个线程是由另一个线程启动的,那么它仍然会被运行主循环的线程加入,而不是父线程。这个问题在自动加入进程中也是一样,只不过我想那时候会抛出一个异常。

因此,使用这种方法时,最好只在主循环/图形界面中创建线程或进程。

撰写回答