使用Python多进程管道

13 投票
1 回答
11024 浏览
提问于 2025-04-17 05:14

我正在尝试写一个类,用来通过多个进程计算校验和,这样可以利用多个核心的优势。我有一个很简单的类,运行起来效果很好,特别是在处理简单的情况时。但是每当我创建两个或更多这个类的实例时,工作进程就不会退出。看起来它似乎没有收到父进程关闭管道的消息。

下面是所有的代码。我首先分别计算md5和sha1的校验和,这个部分是可以正常工作的。然后我尝试并行计算,但当要关闭管道时,程序就卡住了。

这到底是怎么回事呢?为什么管道没有按我预期的那样工作?我想我可以通过在队列中发送一个“停止”消息来解决这个问题,让子进程以这种方式退出,但我真的很想知道为什么现在的方式不奏效。

import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):
    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        self.parent_conn.close() # This is the child. Close unused end.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()


def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums in parallel causes a lockup!
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!

main()

附注:这个问题已经解决 如果有人感兴趣,这里有一个可以正常工作的代码版本:

import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):

    all_open_parent_conns = []

    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        for conn in ChecksumPipe.all_open_parent_conns:
            conn.close() # This is the child. Close unused ends.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()

def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums also works fine now
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()

main()

1 个回答

7

是的,这确实是个让人意外的情况。

不过,如果你查看一下两个并行子进程的lsof输出,你会发现第二个子进程打开的文件描述符比第一个多。

发生的情况是,当两个并行的子进程启动时,第二个子进程会继承父进程的管道。因此,当父进程调用self.parent_conn.close()时,第二个子进程仍然保持着那个管道的文件描述符是打开的,这样在内核中这个管道的文件描述符就不会被关闭(引用计数大于0),结果就是第一个并行子进程中的self.child_conn.recv_bytes()永远不会读取到EOF,也就不会抛出EOFError错误。

你可能需要发送一个明确的关闭消息,而不仅仅是关闭文件描述符,因为似乎对哪些文件描述符会在不同进程之间共享没有太多控制(没有关闭时复制文件描述符的标志)。

撰写回答