Multiprocessing.Pool导致Numpy矩阵乘法变慢
我在玩 multiprocessing.Pool
和 Numpy
,但似乎错过了一些重要的点。为什么 pool
版本要慢得多?我查看了 htop
,看到有几个进程被创建,但它们都共享一个CPU,总使用率大约是100%。
$ cat test_multi.py
import numpy as np
from timeit import timeit
from multiprocessing import Pool
def mmul(matrix):
for i in range(100):
matrix = matrix * matrix
return matrix
if __name__ == '__main__':
matrices = []
for i in range(4):
matrices.append(np.random.random_integers(100, size=(1000, 1000)))
pool = Pool(8)
print timeit(lambda: map(mmul, matrices), number=20)
print timeit(lambda: pool.map(mmul, matrices), number=20)
$ python test_multi.py
16.0265390873
19.097837925
[更新]
- 改用
timeit
来测试进程的性能 - 用我的核心数量初始化 Pool
- 改变了计算方式,希望能增加计算量,减少内存传输
但还是没有变化。pool
版本依然慢,而且在 htop
中我看到只有一个核心在使用,虽然有多个进程在运行。
[更新2]
目前我正在阅读 @Jan-Philip Gehrcke 的建议,使用 multiprocessing.Process()
和 Queue
。但同时我想知道:
- 为什么我的例子在 tiago 上能工作?是什么原因导致它在我的机器上不工作1?
- 在我的例子代码中,进程之间有任何复制吗?我希望我的代码能让每个线程处理矩阵列表中的一个矩阵。
- 我的代码是个坏例子吗,因为我使用了
Numpy
?
我了解到,通常当别人知道我的最终目标时,能得到更好的答案,所以:我有很多文件,目前是串行加载和处理的。处理过程很耗CPU,所以我认为并行处理能带来很大提升。我的目标是并行调用一个分析文件的Python函数。此外,这个函数只是C代码的接口,我认为这也会有影响。
1 Ubuntu 12.04,Python 2.7.3,i7 860 @ 2.80 - 如果你需要更多信息,请留言。
[更新3]
这是Stefano的示例代码的结果。出于某种原因,没有速度提升。:/
testing with 16 matrices
base 4.27
1 5.07
2 4.76
4 4.71
8 4.78
16 4.79
testing with 32 matrices
base 8.82
1 10.39
2 10.58
4 10.73
8 9.46
16 9.54
testing with 64 matrices
base 17.38
1 19.34
2 19.62
4 19.59
8 19.39
16 19.34
[更新 4] 回复 Jan-Philip Gehrcke的评论
抱歉我没有说清楚。正如我在更新2中所写,我的主要目标是并行化许多对第三方Python库函数的串行调用。这个函数是一些C代码的接口。我被建议使用 Pool
,但这没有效果,所以我尝试了更简单的例子,就是上面展示的 numpy
示例。但即使这样我也没有实现性能提升,尽管在我看来它“明显可以并行化”。所以我认为我一定错过了什么重要的东西。这就是我提这个问题和悬赏的原因。
[更新 5]
感谢大家的宝贵意见。但阅读你们的回答只让我产生了更多问题。因此,我会先学习一下基础知识,等我对自己不知道的东西有更清晰的理解后再提出新的问题。
8 个回答
你的代码是正确的。我刚在我的系统上运行了一下(我的电脑有两个核心,并且支持超线程),得到了以下结果:
$ python test_multi.py
30.8623809814
19.3914041519
我查看了一下进程,果然,平行处理的部分显示有几个进程的工作效率接近100%。这可能是你系统或Python安装上的问题。
在这里,通信开销和计算速度之间的竞争是个不可预测的问题。你观察到的现象是完全正常的。是否能获得整体的速度提升,取决于很多因素,这些因素需要好好量化(就像你做的那样)。
那么,为什么在你的情况下,multiprocessing
会显得“意外地慢”呢? multiprocessing
的 map
和 map_async
函数实际上是通过管道在父进程和子进程之间来回传递Python对象,这个过程可能会花费相当多的时间。在这段时间里,子进程几乎没有事情可做,这就是你在 htop
中看到的情况。不同系统之间,管道传输的性能可能差别很大,这也是为什么有些人发现你的进程池代码比单个CPU代码快,而对你来说却不是(其他因素也可能影响结果,这只是为了说明这个现象的一个例子)。
你可以做些什么来加快速度呢?
在符合POSIX标准的系统上,不要对输入进行序列化。
如果你在Unix系统上,可以利用POSIX的进程分叉行为(写时复制)来避免父子进程之间的通信开销:
在父进程中创建你的工作输入(例如,一组大的矩阵),并将其放在一个全局可访问的变量中。然后通过调用
multiprocessing.Process()
来创建工作进程。在子进程中,从全局变量中获取工作输入。简单来说,这样可以让子进程直接访问父进程的内存,而不需要任何通信开销(*,下面会解释)。将结果通过例如multiprocessing.Queue
发送回父进程。这将节省大量的通信开销,特别是当输出相对于输入较小时。这种方法在Windows上不适用,因为在Windows上,multiprocessing.Process()
会创建一个全新的Python进程,而不会继承父进程的状态。利用numpy的多线程。
根据你的实际计算任务,使用
multiprocessing
可能根本没有帮助。如果你自己编译numpy并启用OpenMP指令,那么对大矩阵的操作可能会非常高效地进行多线程处理(并分布在多个CPU核心上;在这里,GIL并不是限制因素)。基本上,这是在numpy/scipy上下文中使用多个CPU核心的最有效方式。
*一般来说,子进程不能直接访问父进程的内存。然而,在 fork()
之后,父子进程处于相同的状态。将父进程的整个内存复制到RAM的另一个地方是很愚蠢的。因此,写时复制原则就派上用场了。只要子进程不改变其内存状态,它实际上是访问父进程的内存。只有在修改时,相关的内存部分才会被复制到子进程的内存空间中。
重要更新:
让我添加一段代码,它使用多个工作进程处理大量输入数据,并遵循“1. 在符合POSIX标准的系统上,不要对输入进行序列化。”的建议。此外,传回工作管理器(父进程)的信息量非常少。这个例子的重计算部分是一个单值分解。它可以大量利用OpenMP。我已经多次执行了这个例子:
- 一次使用1、2或4个工作进程,
OMP_NUM_THREADS=1
,这样每个工作进程的最大负载为100%。在这里,提到的工作进程数量与计算时间的比例几乎是线性的,整体的速度提升因子与参与的工作进程数量相对应。 - 一次使用1、2或4个工作进程,
OMP_NUM_THREADS=4
,这样每个进程的最大负载为400%(通过生成4个OpenMP线程)。我的机器有16个真实核心,所以4个进程每个最大400%的负载将几乎达到机器的最大性能。此时,比例不再完全线性,速度提升因子也不是参与的工作进程数量,但与OMP_NUM_THREADS=1
相比,绝对计算时间显著减少,并且随着工作进程数量的增加,时间仍然显著减少。 - 一次使用更大的输入数据,4个核心,
OMP_NUM_THREADS=4
。这导致系统平均负载为1253%。 - 一次使用与上次相同的设置,但
OMP_NUM_THREADS=5
。这导致系统平均负载为1598%,这表明我们充分利用了这台16核机器。然而,实际的计算墙时间与前一种情况相比并没有改善。
代码如下:
import os
import time
import math
import numpy as np
from numpy.linalg import svd as svd
import multiprocessing
# If numpy is compiled for OpenMP, then make sure to control
# the number of OpenMP threads via the OMP_NUM_THREADS environment
# variable before running this benchmark.
MATRIX_SIZE = 1000
MATRIX_COUNT = 16
def rnd_matrix():
offset = np.random.randint(1,10)
stretch = 2*np.random.rand()+0.1
return offset + stretch * np.random.rand(MATRIX_SIZE, MATRIX_SIZE)
print "Creating input matrices in parent process."
# Create input in memory. Children access this input.
INPUT = [rnd_matrix() for _ in xrange(MATRIX_COUNT)]
def worker_function(result_queue, worker_index, chunk_boundary):
"""Work on a certain chunk of the globally defined `INPUT` list.
"""
result_chunk = []
for m in INPUT[chunk_boundary[0]:chunk_boundary[1]]:
# Perform single value decomposition (CPU intense).
u, s, v = svd(m)
# Build single numeric value as output.
output = int(np.sum(s))
result_chunk.append(output)
result_queue.put((worker_index, result_chunk))
def work(n_workers=1):
def calc_chunksize(l, n):
"""Rudimentary function to calculate the size of chunks for equal
distribution of a list `l` among `n` workers.
"""
return int(math.ceil(len(l)/float(n)))
# Build boundaries (indices for slicing) for chunks of `INPUT` list.
chunk_size = calc_chunksize(INPUT, n_workers)
chunk_boundaries = [
(i, i+chunk_size) for i in xrange(0, len(INPUT), chunk_size)]
# When n_workers and input list size are of same order of magnitude,
# the above method might have created less chunks than workers available.
if n_workers != len(chunk_boundaries):
return None
result_queue = multiprocessing.Queue()
# Prepare child processes.
children = []
for worker_index in xrange(n_workers):
children.append(
multiprocessing.Process(
target=worker_function,
args=(
result_queue,
worker_index,
chunk_boundaries[worker_index],
)
)
)
# Run child processes.
for c in children:
c.start()
# Create result list of length of `INPUT`. Assign results upon arrival.
results = [None] * len(INPUT)
# Wait for all results to arrive.
for _ in xrange(n_workers):
worker_index, result_chunk = result_queue.get(block=True)
chunk_boundary = chunk_boundaries[worker_index]
# Store the chunk of results just received to the overall result list.
results[chunk_boundary[0]:chunk_boundary[1]] = result_chunk
# Join child processes (clean up zombies).
for c in children:
c.join()
return results
def main():
durations = []
n_children = [1, 2, 4]
for n in n_children:
print "Crunching input with %s child(ren)." % n
t0 = time.time()
result = work(n)
if result is None:
continue
duration = time.time() - t0
print "Result computed by %s child process(es): %s" % (n, result)
print "Duration: %.2f s" % duration
durations.append(duration)
normalized_durations = [durations[0]/d for d in durations]
for n, normdur in zip(n_children, normalized_durations):
print "%s-children speedup: %.2f" % (n, normdur)
if __name__ == '__main__':
main()
输出结果:
$ export OMP_NUM_THREADS=1
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 16.66 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 8.27 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 4.37 s
1-children speedup: 1.00
2-children speedup: 2.02
4-children speedup: 3.81
48.75user 1.75system 0:30.00elapsed 168%CPU (0avgtext+0avgdata 1007936maxresident)k
0inputs+8outputs (1major+809308minor)pagefaults 0swaps
$ export OMP_NUM_THREADS=4
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 8.62 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 4.92 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 2.95 s
1-children speedup: 1.00
2-children speedup: 1.75
4-children speedup: 2.92
106.72user 3.07system 0:17.19elapsed 638%CPU (0avgtext+0avgdata 1022240maxresident)k
0inputs+8outputs (1major+841915minor)pagefaults 0swaps
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [21762, 26806, 10148, 22947, 20900, 8161, 20168, 17439, 23497, 26360, 6789, 11216, 12769, 23022, 26221, 20480, 19140, 13757, 23692, 19541, 24644, 21251, 21000, 21687, 32187, 5639, 23314, 14678, 18289, 12493, 29766, 14987, 12580, 17988, 20853, 4572, 16538, 13284, 18612, 28617, 19017, 23145, 11183, 21018, 10922, 11709, 27895, 8981]
Duration: 12.69 s
4-children speedup: 1.00
174.03user 4.40system 0:14.23elapsed 1253%CPU (0avgtext+0avgdata 2887456maxresident)k
0inputs+8outputs (1major+1211632minor)pagefaults 0swaps
$ export OMP_NUM_THREADS=5
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [19528, 17575, 21792, 24303, 6352, 22422, 25338, 18183, 15895, 19644, 20161, 22556, 24657, 30571, 13940, 18891, 10866, 21363, 20585, 15289, 6732, 10851, 11492, 29146, 12611, 15022, 18967, 25171, 10759, 27283, 30413, 14519, 25456, 18934, 28445, 12768, 28152, 24055, 9285, 26834, 27731, 33398, 10172, 22364, 12117, 14967, 18498, 8111]
Duration: 13.08 s
4-children speedup: 1.00
230.16user 5.98system 0:14.77elapsed 1598%CPU (0avgtext+0avgdata 2898640maxresident)k
0inputs+8outputs (1major+1219611minor)pagefaults 0swaps
关于你所有的进程都在同一个CPU上运行的情况,可以看看我在这里的回答.
在导入numpy
时,它会改变父进程的CPU亲和性,这样当你之后使用Pool
时,所有它创建的工作进程都会争抢同一个核心,而不是利用你机器上所有可用的核心。
你可以在导入numpy
后调用taskset
来重置CPU亲和性,这样就能使用所有核心:
import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool
def mmul(matrix):
for i in range(100):
matrix = matrix * matrix
return matrix
if __name__ == '__main__':
matrices = []
for i in range(4):
matrices.append(np.random.random_integers(100, size=(1000, 1000)))
print timeit(lambda: map(mmul, matrices), number=20)
# after importing numpy, reset the CPU affinity of the parent process so
# that it will use all cores
os.system("taskset -p 0xff %d" % os.getpid())
pool = Pool(8)
print timeit(lambda: pool.map(mmul, matrices), number=20)
输出:
$ python tmp.py
12.4765810966
pid 29150's current affinity mask: 1
pid 29150's new affinity mask: ff
13.4136221409
如果你在运行这个脚本时使用top
来查看CPU使用情况,你应该能看到在执行“并行”部分时,它会使用你所有的核心。正如其他人指出的,在你最初的例子中,数据序列化、进程创建等的开销可能会超过并行化带来的任何好处。
编辑:我怀疑单个进程看起来总是更快的部分原因是numpy
可能有一些加速逐元素矩阵乘法的技巧,而在任务分散到多个核心时无法使用。
例如,如果我只是用普通的Python列表来计算斐波那契数列,我可以从并行化中获得巨大的速度提升。同样,如果我以不利用向量化的方式进行逐元素乘法,我也能在并行版本中获得类似的速度提升:
import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool
def fib(dummy):
n = [1,1]
for ii in xrange(100000):
n.append(n[-1]+n[-2])
def silly_mult(matrix):
for row in matrix:
for val in row:
val * val
if __name__ == '__main__':
dt = timeit(lambda: map(fib, xrange(10)), number=10)
print "Fibonacci, non-parallel: %.3f" %dt
matrices = [np.random.randn(1000,1000) for ii in xrange(10)]
dt = timeit(lambda: map(silly_mult, matrices), number=10)
print "Silly matrix multiplication, non-parallel: %.3f" %dt
# after importing numpy, reset the CPU affinity of the parent process so
# that it will use all CPUS
os.system("taskset -p 0xff %d" % os.getpid())
pool = Pool(8)
dt = timeit(lambda: pool.map(fib,xrange(10)), number=10)
print "Fibonacci, parallel: %.3f" %dt
dt = timeit(lambda: pool.map(silly_mult, matrices), number=10)
print "Silly matrix multiplication, parallel: %.3f" %dt
输出:
$ python tmp.py
Fibonacci, non-parallel: 32.449
Silly matrix multiplication, non-parallel: 40.084
pid 29528's current affinity mask: 1
pid 29528's new affinity mask: ff
Fibonacci, parallel: 9.462
Silly matrix multiplication, parallel: 12.163