如何在Python numpy中并行化求和计算?
我正在尝试计算一个总和,但在让代码并行运行时遇到了困难。我想并行处理的计算有点复杂(它使用了numpy数组和scipy稀疏矩阵)。最终结果是一个numpy数组,我想把大约1000次计算的输出数组加起来。理想情况下,我希望在所有迭代中保持一个持续的总和。不过,我还没找到合适的方法来实现这一点。
到目前为止,我尝试过使用joblib的Parallel函数和python的multiprocessing包中的pool.map函数。在这两种情况下,我都使用了一个内部函数来返回一个numpy数组。这些函数返回的是一个列表,我再把它转换成numpy数组,然后进行求和。
但是,在joblib的Parallel函数完成所有迭代后,主程序似乎没有继续运行(看起来原来的任务处于挂起状态,CPU使用率为0%)。而使用pool.map时,在所有迭代完成后,我会遇到内存错误。
有没有简单的方法可以并行计算数组的运行总和呢?
编辑: 目标是做类似下面的事情,只不过是并行处理。
def summers(num_iters):
sumArr = np.zeros((1,512*512)) #initialize sum
for index in range(num_iters):
sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array
return sumArr
2 个回答
我不太明白你说的问题。你是想把一个列表分配给一组工作者,让他们各自计算自己的和,然后再把结果加起来吗?
#!/bin/env python
import sys
import random
import time
import multiprocessing
import numpy as np
numpows = 5
numitems = 25
nprocs = 4
def expensiveComputation( i ):
time.sleep( random.random() * 10 )
return np.array([i**j for j in range(numpows)])
def listsum( l ):
sum = np.zeros_like(l[0])
for item in l:
sum = sum + item
return sum
def partition(lst, n):
division = len(lst) / float(n)
return [ lst[int(round(division * i)): int(round(division * (i + 1)))] for i in xrange(n) ]
def myRunningSum( l ):
sum = np.zeros(numpows)
for item in l:
sum = sum + expensiveComputation(item)
return sum
if __name__ == '__main__':
random.seed(1)
data = range(numitems)
pool = multiprocessing.Pool(processes=4,)
calculations = pool.map(myRunningSum, partition(data,nprocs))
print 'Answer is:', listsum(calculations)
print 'Expected answer: ', np.array([25.,300.,4900.,90000.,1763020.])
(这个分配函数来自于 Python: 将列表切分成n个几乎相等长度的部分)
我找到了一个用多进程来并行计算数组总和的方法,使用了apply_async和回调,所以我把这个分享出来,方便其他人参考。我参考了Parallel Python的示例页面中的Sum回调类,虽然我并没有实际使用那个包来实现,但它给了我使用回调的灵感。下面是我最终使用的简化代码,它能完成我想要的功能。
import multiprocessing
import numpy as np
import thread
class Sum: #again, this class is from ParallelPython's example code (I modified for an array and added comments)
def __init__(self):
self.value = np.zeros((1,512*512)) #this is the initialization of the sum
self.lock = thread.allocate_lock()
self.count = 0
def add(self,value):
self.count += 1
self.lock.acquire() #lock so sum is correct if two processes return at same time
self.value += value #the actual summation
self.lock.release()
def computation(index):
array1 = np.ones((1,512*512))*index #this is where the array-returning computation goes
return array1
def summers(num_iters):
pool = multiprocessing.Pool(processes=8)
sumArr = Sum() #create an instance of callback class and zero the sum
for index in range(num_iters):
singlepoolresult = pool.apply_async(computation,(index,),callback=sumArr.add)
pool.close()
pool.join() #waits for all the processes to finish
return sumArr.value
我还成功地使用了一个并行化的map,这个方法在另一个回答中被提到过。我之前也尝试过这个,但没有正确实现。两种方法都可以用,我觉得这个回答对选择使用哪种方法(map还是apply.async)解释得很好。对于map版本,你不需要定义Sum类,summers函数变成了
def summers(num_iters):
pool = multiprocessing.Pool(processes=8)
outputArr = np.zeros((num_iters,1,512*512)) #you wouldn't have to initialize these
sumArr = np.zeros((1,512*512)) #but I do to make sure I have the memory
outputArr = np.array(pool.map(computation, range(num_iters)))
sumArr = outputArr.sum(0)
pool.close() #not sure if this is still needed since map waits for all iterations
return sumArr