如何结合itertools和multiprocessing?
我有一个 256x256x256
的 Numpy 数组,里面的每个元素都是一个矩阵。我需要对这些矩阵进行一些计算,并且想用 multiprocessing
模块来加快速度。
这些计算的结果必须存储在一个和原数组一样的 256x256x256
数组中,也就是说,原数组中元素 [i,j,k]
的矩阵计算结果必须放到新数组的 [i,j,k]
位置。
为了做到这一点,我想创建一个列表,可以用一种伪代码的方式表示为 [array[i,j,k], (i, j, k)]
,然后把它传递给一个函数进行“多进程处理”。假设 matrices
是从原数组提取出的所有矩阵的列表,而 myfunc
是进行计算的函数,代码大概是这样的:
import multiprocessing
import numpy as np
from itertools import izip
def myfunc(finput):
# Do some calculations...
...
# ... and return the result and the index:
return (result, finput[1])
# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)
# Make function input from the matrices and the indices:
finput = izip(matrices, inds)
pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))
不过,看起来 map_async
实际上是先创建了一个巨大的 finput
列表:我的 CPU 并没有太多工作,但内存和交换空间在几秒钟内就被完全占满,这显然不是我想要的。
有没有办法在不显式创建这个巨大的列表的情况下,将它传递给一个多进程函数?或者你知道其他解决这个问题的方法吗?
非常感谢!:-)
3 个回答
Pool.map_async()
这个函数需要知道你要处理的数据有多长,这样才能把工作分配给多个工作者。因为 izip
这个工具没有 __len__
这个功能,所以它会先把你的数据转换成一个列表,这样就会消耗大量的内存,导致你遇到的问题。
你可以尝试自己创建一个类似 izip
的迭代器,并给它加上 __len__
的功能,这样就能避免这个问题。
我也遇到过这个问题。不是这样:
res = p.map(func, combinations(arr, select_n))
做
res = p.imap(func, combinations(arr, select_n))
imap没有消耗它!
所有的 multiprocessing.Pool.map*
方法在调用函数时会一次性把迭代器里的内容全部消耗掉(示例代码)。如果你想让 map 函数一次只处理迭代器中的一部分内容,可以使用 grouper_nofill
。
def grouper_nofill(n, iterable):
'''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
'''
it=iter(iterable)
def take():
while 1: yield list(itertools.islice(it,n))
return iter(take().next,[])
chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)
另外,pool.map_async
的 chunksize
参数做的事情有点不同:它会把可迭代对象分成几块,然后把每一块交给一个工作进程,这个进程会调用 map(func, chunk)
。这样做的好处是,如果 func(item)
执行得很快,工作进程就能处理更多的数据,但在你的情况中并没有帮助,因为迭代器在调用 map_async
后还是会立刻被完全消耗掉。