Python multiprocessing Pool.map 为什么会调用 aquire?

12 投票
1 回答
1925 浏览
提问于 2025-04-16 04:28

我有一个大小为640x480的numpy数组,每个数组里有630张图片。总的数组大小就是630x480x640。我想生成一张平均图像,并计算所有630张图片中每个像素的标准差。

这个操作其实很简单,可以通过下面的代码实现:

avg_image = numpy.mean(img_array, axis=0)
std_image = numpy.std(img_array, axis=0)

不过,因为我需要处理大约50个这样的数组,而且我的工作站有8个核心和16个线程,所以我想贪心一点,使用multiprocessing.Pool来并行处理。

于是我做了以下操作:

def chunk_avg_map(chunk):
    #do the processing
    sig_avg = numpy.mean(chunk, axis=0)
    sig_std = numpy.std(chunk, axis=0)
    return([sig_avg, sig_std])

def chunk_avg(img_data):

    #take each row of the image
    chunks = [img_data[:,i,:] for i in range(len(img_data[0]))]

    pool = multiprocessing.Pool()
    result = pool.map(chunk_avg_map, chunks)
    pool.close()
    pool.join()
    return result

但是,我发现速度提升并不明显。我在chunk_avg_map里加了打印语句,发现一次只启动了一两个进程,而不是我预期的16个。

接着,我在iPython中用cProfile分析了我的代码:

%prun current_image_anal.main()

结果显示,绝大部分时间都花在了调用acquire上:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1527  309.755    0.203  309.755    0.203 {built-in method acquire}

我知道这和锁定有关,但我不明白为什么我的代码会这样。有没有人能给我一些建议?

[编辑] 按要求,这里有一个可以运行的脚本,展示了这个问题。你可以用任何方式进行分析,但我发现大部分时间都花在了调用acquire上,而不是我预期的平均值或标准差。

#!/usr/bin/python
import numpy
import multiprocessing

def main():
    fake_images = numpy.random.randint(0,2**14,(630,480,640))
    chunk_avg(fake_images)

def chunk_avg_map(chunk):
    #do the processing
    sig_avg = numpy.mean(chunk, axis=0)
    sig_std = numpy.std(chunk, axis=0)
    return([sig_avg, sig_std])

def chunk_avg(img_data):

    #take each row of the image
    chunks = [img_data[:,i,:] for i in range(len(img_data[0]))]

    pool = multiprocessing.Pool()
    result = pool.map(chunk_avg_map, chunks)
    pool.close()
    pool.join()
    return result

if __name__ == "__main__":
    main()

1 个回答

7

我觉得问题在于,处理每一小块数据所需的CPU时间相对于从工作进程中复制输入和输出所需的时间来说是很少的。我修改了你的示例代码,把输出分成16个相等的小块,并打印出运行chunk_avg_map()开始和结束之间的CPU时间差(使用time.clock())。在我的系统上,每次运行的CPU时间稍微不到一秒,但整个进程组的CPU时间(系统时间加用户时间)却超过了38秒。每一小块大约有0.75秒的复制开销,这使得你的程序计算的速度只比multiprocessing传输数据的速度快一点,导致一次最多只能使用两个工作进程。

如果我把代码修改成“输入数据”只是xrange(16),并在chunk_avg_map()中构建随机数组,那么我看到系统和用户时间降到了大约19秒,并且所有16个工作进程同时执行。

撰写回答