并行化迭代循环
我在代码运行上遇到了性能问题。第 # IIII
步骤耗时好几个小时。之前我会把 itertools.prodct 的结果存下来,但在一个用户的建议下,我现在不再使用 pro_data = product(array_b,array_a)
了。这虽然解决了我的内存问题,但仍然非常耗时。
我想用多线程或多进程来加速这个过程,任何建议我都非常感激。
解释一下。我有两个数组,分别包含粒子的 x 和 y 值。对于每个粒子(由两个坐标定义),我想用另一个粒子计算一个函数。为了得到所有组合,我使用了 itertools.product 方法,并对每个粒子进行循环。总共有超过 50000 个粒子,所以我需要计算 N*N/2 个组合。
提前谢谢你们!
import numpy as np
import matplotlib.pyplot as plt
from itertools import product,combinations_with_replacement
def func(ar1,ar2,ar3,ar4): #example func that takes four arguments
return (ar1*ar2**22+np.sin(ar3)+ar4)
def newdist(a):
return func(a[0][0],a[0][1],a[1][0],a[1][1])
x_edges = np.logspace(-3,1, num=25) #prepare x-axis for histogram
x_mean = 10**((np.log10(x_edges[:-1])+np.log10(x_edges[1:]))/2)
x_width=x_edges[1:]-x_edges[:-1]
hist_data=np.zeros([len(x_edges)-1])
array1=np.random.uniform(0.,10.,100)
array2=np.random.uniform(0.,10.,100)
array_a = np.dstack((array1,array1))[0]
array_b = np.dstack((array2,array2))[0]
# IIII
for i in product(array_a,array_b):
(result,bins) = np.histogram(newdist(i),bins=x_edges)
hist_data+=result
hist_data = np.array(map(float, hist_data))
plt.bar(x_mean,hist_data,width=x_width,color='r')
plt.show()
-----编辑-----
我现在使用了这个代码:
def mp_dist(array_a,array_b, d, bins): #d chunks AND processes
def worker(array_ab, out_q):
""" push result in queue """
outdict = {}
outdict = vec_chunk(array_ab, bins)
out_q.put(outdict)
out_q = mp.Queue()
a = np.swapaxes(array_a, 0 ,1)
b = np.swapaxes(array_b, 0 ,1)
array_size_a=len(array_a)-(len(array_a)%d)
array_size_b=len(array_b)-(len(array_b)%d)
a_chunk = array_size_a / d
b_chunk = array_size_b / d
procs = []
#prepare arrays for mp
array_ab = np.empty((4, a_chunk, b_chunk))
for j in xrange(d):
for k in xrange(d):
array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None]
array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)]
p = mp.Process(target=worker, args=(array_ab, out_q))
procs.append(p)
p.start()
resultarray = np.empty(len(bins)-1)
for i in range(d):
resultarray+=out_q.get()
# Wait for all worker processes to finish
for pro in procs:
pro.join()
print resultarray
return resultarray
这里的问题是我无法控制进程的数量。我该如何使用 mp.Pool()
呢?
2 个回答
首先,我们来看看如何简单地处理你的问题。我感觉你想要的 array_a
和 array_b
是完全一样的,也就是粒子的坐标,不过我在这里把它们分开了。
我把你的代码变成了一个函数,这样可以更方便地测量时间:
def IIII(array_a, array_b, bins) :
hist_data=np.zeros([len(bins)-1])
for i in product(array_a,array_b):
(result,bins) = np.histogram(newdist(i), bins=bins)
hist_data+=result
hist_data = np.array(map(float, hist_data))
return hist_data
顺便说一下,你可以用一种更简单的方法生成你的样本数据,如下所示:
n = 100
array_a = np.random.uniform(0, 10, size=(n, 2))
array_b = np.random.uniform(0, 10, size=(n, 2))
接下来,我们需要对你的 func
进行向量化。我已经做到了,它可以处理任何形状为 (4, ...)
的 array
。为了节省内存,它在原地进行计算,并返回第一个平面,也就是 array[0]
。
def func_vectorized(a) :
a[1] **= 22
np.sin(a[2], out=a[2])
a[0] *= a[1]
a[0] += a[2]
a[0] += a[3]
return a[0]
有了这个函数,我们可以写出一个向量化的 IIII
版本:
def IIII_vec(array_a, array_b, bins) :
array_ab = np.empty((4, len(array_a), len(array_b)))
a = np.swapaxes(array_a, 0 ,1)
b = np.swapaxes(array_b, 0 ,1)
array_ab[[0, 1]] = a[:, :, None]
array_ab[[2, 3]] = b[:, None, :]
newdist = func_vectorized(array_ab)
hist, _ = np.histogram(newdist, bins=bins)
return hist
当 n = 100
点时,它们的返回结果是一样的:
In [2]: h1 = IIII(array_a, array_b, x_edges)
In [3]: h2 = IIII_bis(array_a, array_b, x_edges)
In [4]: np.testing.assert_almost_equal(h1, h2)
不过时间上的差异已经很明显了:
In [5]: %timeit IIII(array_a, array_b, x_edges)
1 loops, best of 3: 654 ms per loop
In [6]: %timeit IIII_vec(array_a, array_b, x_edges)
100 loops, best of 3: 2.08 ms per loop
速度提升了300倍!如果你再用更长的样本数据,n = 1000
,你会发现它们的表现都不太好,都是 n**2
的增长,所以300倍的提升依然存在:
In [10]: %timeit IIII(array_a, array_b, x_edges)
1 loops, best of 3: 68.2 s per loop
In [11]: %timeit IIII_bis(array_a, array_b, x_edges)
1 loops, best of 3: 229 ms per loop
所以你还是需要大约10分钟的处理时间,这和你当前的解决方案需要超过两天相比,已经算是不错了。
当然,要做到这一点,你需要把一个 (4, 50000, 50000)
的浮点数组放进内存,但我的系统无法处理这么大的数据。不过你可以通过分块处理来保持相对较快的速度。下面这个版本的 IIII_vec
将每个数组分成 d
个块。按照现在的写法,数组的长度应该能被 d
整除。虽然克服这个限制并不难,但会让真正的目的变得不清晰:
def IIII_vec_bis(array_a, array_b, bins, d=1) :
a = np.swapaxes(array_a, 0 ,1)
b = np.swapaxes(array_b, 0 ,1)
a_chunk = len(array_a) // d
b_chunk = len(array_b) // d
array_ab = np.empty((4, a_chunk, b_chunk))
hist_data = np.zeros((len(bins) - 1,))
for j in xrange(d) :
for k in xrange(d) :
array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None]
array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)]
newdist = func_vectorized(array_ab)
hist, _ = np.histogram(newdist, bins=bins)
hist_data += hist
return hist_data
首先,让我们检查一下它是否真的有效:
In [4]: h1 = IIII_vec(array_a, array_b, x_edges)
In [5]: h2 = IIII_vec_bis(array_a, array_b, x_edges, d=10)
In [6]: np.testing.assert_almost_equal(h1, h2)
接下来是一些时间测试。当 n = 100
时:
In [7]: %timeit IIII_vec(array_a, array_b, x_edges)
100 loops, best of 3: 2.02 ms per loop
In [8]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
100 loops, best of 3: 12 ms per loop
但是当你需要在内存中处理越来越大的数组时,分块处理开始显现出优势。当 n = 1000
时:
In [12]: %timeit IIII_vec(array_a, array_b, x_edges)
1 loops, best of 3: 223 ms per loop
In [13]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
1 loops, best of 3: 208 ms per loop
当 n = 10000
时,我再也无法调用 IIII_vec
,因为会出现 数组太大 的错误,但分块版本仍然在运行:
In [18]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
1 loops, best of 3: 21.8 s per loop
为了证明这是可行的,我用 n = 50000
运行了一次:
In [23]: %timeit -n1 -r1 IIII_vec_bis(array_a, array_b, x_edges, d=50)
1 loops, best of 1: 543 s per loop
所以大约9分钟的计算时间,这并不算太糟糕,毕竟它计算了25亿次交互。
使用向量化的numpy操作。用一次newdist()
调用来替代对product()
的循环,这样可以通过使用meshgrid()
来创建参数。
为了让问题更高效,可以在array_a
和array_b
的切片上计算newdist()
,这些切片对应于meshgrid()
的子块。这里有一个使用切片和多进程的例子.
下面是另一个例子,展示了步骤:python循环 -> 向量化的numpy版本 -> 并行处理:
#!/usr/bin/env python
from __future__ import division
import math
import multiprocessing as mp
import numpy as np
try:
from itertools import izip as zip
except ImportError:
zip = zip # Python 3
def pi_loop(x, y, npoints):
"""Compute pi using Monte-Carlo method."""
# note: the method converges to pi very slowly.
return 4 * sum(1 for xx, yy in zip(x, y) if (xx**2 + yy**2) < 1) / npoints
def pi_vectorized(x, y, npoints):
return 4 * ((x**2 + y**2) < 1).sum() / npoints # or just .mean()
def mp_init(x_shared, y_shared):
global mp_x, mp_y
mp_x, mp_y = map(np.frombuffer, [x_shared, y_shared]) # no copy
def mp_pi(args):
# perform computations on slices of mp_x, mp_y
start, end = args
x = mp_x[start:end] # no copy
y = mp_y[start:end]
return ((x**2 + y**2) < 1).sum()
def pi_parallel(x, y, npoints):
# compute pi using multiple processes
pool = mp.Pool(initializer=mp_init, initargs=[x, y])
step = 100000
slices = ((start, start + step) for start in range(0, npoints, step))
return 4 * sum(pool.imap_unordered(mp_pi, slices)) / npoints
def main():
npoints = 1000000
# create shared arrays
x_sh, y_sh = [mp.RawArray('d', npoints) for _ in range(2)]
# initialize arrays
x, y = map(np.frombuffer, [x_sh, y_sh])
x[:] = np.random.uniform(size=npoints)
y[:] = np.random.uniform(size=npoints)
for f, a, b in [(pi_loop, x, y),
(pi_vectorized, x, y),
(pi_parallel, x_sh, y_sh)]:
pi = f(a, b, npoints)
precision = int(math.floor(math.log10(npoints)) / 2 - 1 + 0.5)
print("%.*f %.1e" % (precision + 1, pi, abs(pi - math.pi)))
if __name__=="__main__":
main()
对于npoints = 10_000_000
的时间性能:
pi_loop pi_vectorized pi_parallel
32.6 0.159 0.069 # seconds
这表明,主要的性能提升来自于将python循环转换为向量化的numpy版本。