大对象与`multiprocessing`管道及`send()`

5 投票
1 回答
6596 浏览
提问于 2025-04-17 17:26

我最近发现,如果我们使用 multiprocessing.Pipe 创建一对父子连接对象,而我们试图通过这个管道发送的对象 obj 太大了,我的程序就会卡住,不会抛出任何异常,也不会做任何事情。下面是代码示例。(这段代码使用 numpy 包生成一个很大的浮点数组。)

import multiprocessing as mp
import numpy as np

def big_array(conn, size=1200):
    a = np.random.rand(size)
    print "Child process trying to send array of %d floats." %size
    conn.send(a)
    return a

if __name__ == "__main__":
    print "Main process started."
    parent_conn, child_conn = mp.Pipe()
    proc = mp.Process(target=big_array, args=[child_conn, 1200])
    proc.start()
    print "Child process started."
    proc.join()
    print "Child process joined."
    a = parent_conn.recv()
    print "Received the following object."
    print "Type: %s. Size: %d." %(type(a), len(a))

输出结果如下。

Main process started.
Child process started.
Child process trying to send array of 1200 floats.

然后程序就会在这里无限期地卡住。不过,如果我们尝试发送一个包含1000个浮点数的数组,而不是1200个,那么程序就能成功执行,并且输出结果也和预期的一样。

Main process started.
Child process started.
Child process trying to send array of 1000 floats.
Child process joined.
Received the following object.
Type: <type 'numpy.ndarray'>. Size: 1000.
Press any key to continue . . .

这看起来像是一个bug。文档中提到:

send(obj) 将一个对象发送到连接的另一端,接收方应该使用 recv() 来读取。

这个对象必须是可序列化的。非常大的序列化对象(大约32MB以上,具体取决于操作系统)可能会引发一个 ValueError 异常。

但是在我的运行中,连 ValueError 异常都没有抛出,程序就这样卡住了。此外,长度为1200的 numpy 数组大小为9600字节,肯定没有超过32MB!这看起来真的是个bug。有没有人知道怎么解决这个问题?

顺便说一下,我使用的是Windows 7,64位系统。

1 个回答

18

试着把 join() 放在 recv() 下面:

import multiprocessing as mp

def big_array(conn, size=1200):
    a = "a" * size
    print "Child process trying to send array of %d floats." %size
    conn.send(a)
    return a

if __name__ == "__main__":
    print "Main process started."
    parent_conn, child_conn = mp.Pipe()
    proc = mp.Process(target=big_array, args=[child_conn, 120000])
    proc.start()
    print "Child process started."
    print "Child process joined."
    a = parent_conn.recv()
    proc.join()
    print "Received the following object."
    print "Type: %s. Size: %d." %(type(a), len(a))

但是我不太明白为什么你的例子即使在小数据量时也能正常工作。我原本以为,先写入管道,然后让进程进行合并,而不先从管道读取数据,会导致合并被阻塞。你应该先从管道接收数据,然后再合并。但显然在小数据量时并不会阻塞...?

编辑:根据文档 (http://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming):

“一个会导致死锁的例子是下面这个:”

from multiprocessing import Process, Queue

def f(q):
q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

撰写回答