Python multiprocessing Pool.map 为什么会调用 aquire?
我有一个大小为640x480的numpy数组,每个数组里有630张图片。总的数组大小就是630x480x640。我想生成一张平均图像,并计算所有630张图片中每个像素的标准差。
这个操作其实很简单,可以通过下面的代码实现:
avg_image = numpy.mean(img_array, axis=0)
std_image = numpy.std(img_array, axis=0)
不过,因为我需要处理大约50个这样的数组,而且我的工作站有8个核心和16个线程,所以我想贪心一点,使用multiprocessing.Pool来并行处理。
于是我做了以下操作:
def chunk_avg_map(chunk):
#do the processing
sig_avg = numpy.mean(chunk, axis=0)
sig_std = numpy.std(chunk, axis=0)
return([sig_avg, sig_std])
def chunk_avg(img_data):
#take each row of the image
chunks = [img_data[:,i,:] for i in range(len(img_data[0]))]
pool = multiprocessing.Pool()
result = pool.map(chunk_avg_map, chunks)
pool.close()
pool.join()
return result
但是,我发现速度提升并不明显。我在chunk_avg_map里加了打印语句,发现一次只启动了一两个进程,而不是我预期的16个。
接着,我在iPython中用cProfile分析了我的代码:
%prun current_image_anal.main()
结果显示,绝大部分时间都花在了调用acquire上:
ncalls tottime percall cumtime percall filename:lineno(function)
1527 309.755 0.203 309.755 0.203 {built-in method acquire}
我知道这和锁定有关,但我不明白为什么我的代码会这样。有没有人能给我一些建议?
[编辑] 按要求,这里有一个可以运行的脚本,展示了这个问题。你可以用任何方式进行分析,但我发现大部分时间都花在了调用acquire上,而不是我预期的平均值或标准差。
#!/usr/bin/python
import numpy
import multiprocessing
def main():
fake_images = numpy.random.randint(0,2**14,(630,480,640))
chunk_avg(fake_images)
def chunk_avg_map(chunk):
#do the processing
sig_avg = numpy.mean(chunk, axis=0)
sig_std = numpy.std(chunk, axis=0)
return([sig_avg, sig_std])
def chunk_avg(img_data):
#take each row of the image
chunks = [img_data[:,i,:] for i in range(len(img_data[0]))]
pool = multiprocessing.Pool()
result = pool.map(chunk_avg_map, chunks)
pool.close()
pool.join()
return result
if __name__ == "__main__":
main()
1 个回答
我觉得问题在于,处理每一小块数据所需的CPU时间相对于从工作进程中复制输入和输出所需的时间来说是很少的。我修改了你的示例代码,把输出分成16个相等的小块,并打印出运行chunk_avg_map()
开始和结束之间的CPU时间差(使用time.clock()
)。在我的系统上,每次运行的CPU时间稍微不到一秒,但整个进程组的CPU时间(系统时间加用户时间)却超过了38秒。每一小块大约有0.75秒的复制开销,这使得你的程序计算的速度只比multiprocessing
传输数据的速度快一点,导致一次最多只能使用两个工作进程。
如果我把代码修改成“输入数据”只是xrange(16)
,并在chunk_avg_map()
中构建随机数组,那么我看到系统和用户时间降到了大约19秒,并且所有16个工作进程同时执行。