如何结合itertools和multiprocessing?

13 投票
3 回答
7906 浏览
提问于 2025-04-17 01:28

我有一个 256x256x256 的 Numpy 数组,里面的每个元素都是一个矩阵。我需要对这些矩阵进行一些计算,并且想用 multiprocessing 模块来加快速度。

这些计算的结果必须存储在一个和原数组一样的 256x256x256 数组中,也就是说,原数组中元素 [i,j,k] 的矩阵计算结果必须放到新数组的 [i,j,k] 位置。

为了做到这一点,我想创建一个列表,可以用一种伪代码的方式表示为 [array[i,j,k], (i, j, k)],然后把它传递给一个函数进行“多进程处理”。假设 matrices 是从原数组提取出的所有矩阵的列表,而 myfunc 是进行计算的函数,代码大概是这样的:

import multiprocessing
import numpy as np
from itertools import izip

def myfunc(finput):
    # Do some calculations...
    ...

    # ... and return the result and the index:
    return (result, finput[1])

# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)

# Make function input from the matrices and the indices:
finput = izip(matrices, inds)

pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))

不过,看起来 map_async 实际上是先创建了一个巨大的 finput 列表:我的 CPU 并没有太多工作,但内存和交换空间在几秒钟内就被完全占满,这显然不是我想要的。

有没有办法在不显式创建这个巨大的列表的情况下,将它传递给一个多进程函数?或者你知道其他解决这个问题的方法吗?

非常感谢!:-)

3 个回答

0

Pool.map_async() 这个函数需要知道你要处理的数据有多长,这样才能把工作分配给多个工作者。因为 izip 这个工具没有 __len__ 这个功能,所以它会先把你的数据转换成一个列表,这样就会消耗大量的内存,导致你遇到的问题。

你可以尝试自己创建一个类似 izip 的迭代器,并给它加上 __len__ 的功能,这样就能避免这个问题。

2

我也遇到过这个问题。不是这样:

res = p.map(func, combinations(arr, select_n))

res = p.imap(func, combinations(arr, select_n))

imap没有消耗它!

12

所有的 multiprocessing.Pool.map* 方法在调用函数时会一次性把迭代器里的内容全部消耗掉(示例代码)。如果你想让 map 函数一次只处理迭代器中的一部分内容,可以使用 grouper_nofill

def grouper_nofill(n, iterable):
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
    '''
    it=iter(iterable)
    def take():
        while 1: yield list(itertools.islice(it,n))
    return iter(take().next,[])

chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
    async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)

另外,pool.map_asyncchunksize 参数做的事情有点不同:它会把可迭代对象分成几块,然后把每一块交给一个工作进程,这个进程会调用 map(func, chunk)。这样做的好处是,如果 func(item) 执行得很快,工作进程就能处理更多的数据,但在你的情况中并没有帮助,因为迭代器在调用 map_async 后还是会立刻被完全消耗掉。

撰写回答