Python多进程Pool.map不比单次调用函数快

4 投票
1 回答
2926 浏览
提问于 2025-04-18 04:48

我有一个非常大的字符串列表(最初来自一个文本文件),我需要用Python来处理这些数据。最终,我想实现一种类似于“映射-归约”的并行处理方式。

我写了一个“映射器”函数,并把它传给了multiprocessing.Pool.map(),但它的运行时间和直接用完整数据调用映射器函数的时间差不多。我一定是哪里做错了。

我尝试了多种方法,结果都差不多。

def initial_map(lines):
    results = []
    for line in lines:
        processed = # process line (O^(1) operation)
        results.append(processed)
    return results

def chunks(l, n):
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

if __name__ == "__main__":
    lines = list(open("../../log.txt", 'r'))
    pool = Pool(processes=8)
    partitions = chunks(lines, len(lines)/8)
    results = pool.map(initial_map, partitions, 1)

这个chunks函数会把原始的行列表分成几个子列表,然后交给pool.map(),接着应该把这8个子列表分给8个不同的进程,让它们通过映射器函数处理。当我运行这个时,我能看到我的8个核心都达到了100%的使用率。但整个过程还是花了22到24秒。

当我简单地运行这个(单进程/单线程):

lines = list(open("../../log.txt", 'r'))
results = initial_map(results)

大约也需要同样的时间,差不多24秒。我只看到一个进程的CPU使用率达到了100%。

我还尝试让池自己分割行,让映射器函数一次只处理一行,结果也是差不多。

def initial_map(line):
    processed = # process line (O^(1) operation)
    return processed

if __name__ == "__main__":
    lines = list(open("../../log.txt", 'r'))
    pool = Pool(processes=8)
    pool.map(initial_map, lines)

大约22秒。

为什么会这样呢?并行处理应该能更快才对,难道不是吗?

1 个回答

1

如果每次处理的工作量很小,那么你花在和子进程沟通上的时间就会占用很大一部分,这样是很浪费的。相反,试着把更大一块的数据传给处理函数。可以参考下面的例子:

slices = (data[i:i+100] for i in range(0, len(data), 100)

def process_slice(data):
    return [initial_data(x) for x in data]

pool.map(process_slice, slices)

# and then itertools.chain the output to flatten it

(我现在没有电脑,所以不能给你一个完整的解决方案,也无法验证我说的内容)

编辑:或者看看你问题下@ubomb的第三条评论。

撰写回答