并行化迭代循环

4 投票
2 回答
543 浏览
提问于 2025-04-17 18:15

我在代码运行上遇到了性能问题。第 # IIII 步骤耗时好几个小时。之前我会把 itertools.prodct 的结果存下来,但在一个用户的建议下,我现在不再使用 pro_data = product(array_b,array_a) 了。这虽然解决了我的内存问题,但仍然非常耗时。

我想用多线程或多进程来加速这个过程,任何建议我都非常感激。

解释一下。我有两个数组,分别包含粒子的 x 和 y 值。对于每个粒子(由两个坐标定义),我想用另一个粒子计算一个函数。为了得到所有组合,我使用了 itertools.product 方法,并对每个粒子进行循环。总共有超过 50000 个粒子,所以我需要计算 N*N/2 个组合。

提前谢谢你们!

import numpy as np
import matplotlib.pyplot as plt
from itertools import product,combinations_with_replacement

def func(ar1,ar2,ar3,ar4): #example func that takes four arguments
  return (ar1*ar2**22+np.sin(ar3)+ar4)

def newdist(a):
  return func(a[0][0],a[0][1],a[1][0],a[1][1])    

x_edges = np.logspace(-3,1, num=25) #prepare x-axis for histogram 

x_mean = 10**((np.log10(x_edges[:-1])+np.log10(x_edges[1:]))/2)
x_width=x_edges[1:]-x_edges[:-1]

hist_data=np.zeros([len(x_edges)-1])

array1=np.random.uniform(0.,10.,100)
array2=np.random.uniform(0.,10.,100)

array_a = np.dstack((array1,array1))[0]
array_b = np.dstack((array2,array2))[0]
# IIII
for i in product(array_a,array_b):
  (result,bins) = np.histogram(newdist(i),bins=x_edges)
  hist_data+=result

hist_data = np.array(map(float, hist_data))
plt.bar(x_mean,hist_data,width=x_width,color='r')
plt.show()

-----编辑-----

我现在使用了这个代码:

def mp_dist(array_a,array_b, d, bins): #d chunks AND processes
  def worker(array_ab, out_q):
      """ push result in queue """
      outdict = {}
      outdict = vec_chunk(array_ab, bins)
      out_q.put(outdict)
  out_q = mp.Queue()
  a = np.swapaxes(array_a, 0 ,1)
  b = np.swapaxes(array_b, 0 ,1)
  array_size_a=len(array_a)-(len(array_a)%d)
  array_size_b=len(array_b)-(len(array_b)%d)
  a_chunk = array_size_a / d
  b_chunk = array_size_b / d
  procs = []
  #prepare arrays for mp
  array_ab = np.empty((4, a_chunk, b_chunk))
  for j in xrange(d):
    for k in xrange(d):
      array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None]
      array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)]
      p = mp.Process(target=worker, args=(array_ab, out_q))
      procs.append(p)
      p.start()
  resultarray = np.empty(len(bins)-1)
  for i in range(d):
      resultarray+=out_q.get() 
  # Wait for all worker processes to finish
  for pro in procs:
      pro.join()
  print resultarray
  return resultarray

这里的问题是我无法控制进程的数量。我该如何使用 mp.Pool() 呢?

2 个回答

4

首先,我们来看看如何简单地处理你的问题。我感觉你想要的 array_aarray_b 是完全一样的,也就是粒子的坐标,不过我在这里把它们分开了。

我把你的代码变成了一个函数,这样可以更方便地测量时间:

def IIII(array_a, array_b, bins) :
    hist_data=np.zeros([len(bins)-1])
    for i in product(array_a,array_b):
        (result,bins) = np.histogram(newdist(i), bins=bins)
        hist_data+=result
    hist_data = np.array(map(float, hist_data))
    return hist_data

顺便说一下,你可以用一种更简单的方法生成你的样本数据,如下所示:

n = 100
array_a = np.random.uniform(0, 10, size=(n, 2))
array_b = np.random.uniform(0, 10, size=(n, 2))

接下来,我们需要对你的 func 进行向量化。我已经做到了,它可以处理任何形状为 (4, ...)array。为了节省内存,它在原地进行计算,并返回第一个平面,也就是 array[0]

def func_vectorized(a) :
    a[1] **= 22
    np.sin(a[2], out=a[2])
    a[0] *= a[1]
    a[0] += a[2]
    a[0] += a[3]
    return a[0]

有了这个函数,我们可以写出一个向量化的 IIII 版本:

def IIII_vec(array_a, array_b, bins) :
    array_ab = np.empty((4, len(array_a), len(array_b)))
    a = np.swapaxes(array_a, 0 ,1)
    b = np.swapaxes(array_b, 0 ,1)
    array_ab[[0, 1]] = a[:, :, None]
    array_ab[[2, 3]] = b[:, None, :]
    newdist = func_vectorized(array_ab)
    hist, _ = np.histogram(newdist, bins=bins)
    return hist

n = 100 点时,它们的返回结果是一样的:

In [2]: h1 = IIII(array_a, array_b, x_edges)

In [3]: h2 = IIII_bis(array_a, array_b, x_edges)

In [4]: np.testing.assert_almost_equal(h1, h2)

不过时间上的差异已经很明显了:

In [5]: %timeit IIII(array_a, array_b, x_edges)
1 loops, best of 3: 654 ms per loop

In [6]: %timeit IIII_vec(array_a, array_b, x_edges)
100 loops, best of 3: 2.08 ms per loop

速度提升了300倍!如果你再用更长的样本数据,n = 1000,你会发现它们的表现都不太好,都是 n**2 的增长,所以300倍的提升依然存在:

In [10]: %timeit IIII(array_a, array_b, x_edges)
1 loops, best of 3: 68.2 s per loop

In [11]: %timeit IIII_bis(array_a, array_b, x_edges)
1 loops, best of 3: 229 ms per loop

所以你还是需要大约10分钟的处理时间,这和你当前的解决方案需要超过两天相比,已经算是不错了。

当然,要做到这一点,你需要把一个 (4, 50000, 50000) 的浮点数组放进内存,但我的系统无法处理这么大的数据。不过你可以通过分块处理来保持相对较快的速度。下面这个版本的 IIII_vec 将每个数组分成 d 个块。按照现在的写法,数组的长度应该能被 d 整除。虽然克服这个限制并不难,但会让真正的目的变得不清晰:

def IIII_vec_bis(array_a, array_b, bins, d=1) :
    a = np.swapaxes(array_a, 0 ,1)
    b = np.swapaxes(array_b, 0 ,1)
    a_chunk = len(array_a) // d
    b_chunk = len(array_b) // d
    array_ab = np.empty((4, a_chunk, b_chunk))
    hist_data = np.zeros((len(bins) - 1,))
    for j in xrange(d) :
        for k in xrange(d) :
            array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None]
            array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)]
            newdist = func_vectorized(array_ab)
            hist, _ = np.histogram(newdist, bins=bins)
            hist_data += hist
    return hist_data

首先,让我们检查一下它是否真的有效:

In [4]: h1 = IIII_vec(array_a, array_b, x_edges)

In [5]: h2 = IIII_vec_bis(array_a, array_b, x_edges, d=10)

In [6]: np.testing.assert_almost_equal(h1, h2)

接下来是一些时间测试。当 n = 100 时:

In [7]: %timeit IIII_vec(array_a, array_b, x_edges)
100 loops, best of 3: 2.02 ms per loop

In [8]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
100 loops, best of 3: 12 ms per loop

但是当你需要在内存中处理越来越大的数组时,分块处理开始显现出优势。当 n = 1000 时:

In [12]: %timeit IIII_vec(array_a, array_b, x_edges)
1 loops, best of 3: 223 ms per loop

In [13]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
1 loops, best of 3: 208 ms per loop

n = 10000 时,我再也无法调用 IIII_vec,因为会出现 数组太大 的错误,但分块版本仍然在运行:

In [18]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
1 loops, best of 3: 21.8 s per loop

为了证明这是可行的,我用 n = 50000 运行了一次:

In [23]: %timeit -n1 -r1 IIII_vec_bis(array_a, array_b, x_edges, d=50)
1 loops, best of 1: 543 s per loop

所以大约9分钟的计算时间,这并不算太糟糕,毕竟它计算了25亿次交互。

4

使用向量化的numpy操作。用一次newdist()调用来替代对product()的循环,这样可以通过使用meshgrid()来创建参数。

为了让问题更高效,可以在array_aarray_b的切片上计算newdist(),这些切片对应于meshgrid()的子块。这里有一个使用切片和多进程的例子.

下面是另一个例子,展示了步骤:python循环 -> 向量化的numpy版本 -> 并行处理:

#!/usr/bin/env python
from __future__ import division
import math
import multiprocessing as mp
import numpy as np

try:
    from itertools import izip as zip
except ImportError:
    zip = zip # Python 3

def pi_loop(x, y, npoints):
    """Compute pi using Monte-Carlo method."""
    #  note: the method converges to pi very slowly.
    return 4 * sum(1 for xx, yy in zip(x, y) if (xx**2 + yy**2) < 1) / npoints

def pi_vectorized(x, y, npoints):
    return 4 * ((x**2 + y**2) < 1).sum() / npoints # or just .mean()

def mp_init(x_shared, y_shared):
    global mp_x, mp_y
    mp_x, mp_y = map(np.frombuffer, [x_shared, y_shared]) # no copy

def mp_pi(args):
    # perform computations on slices of mp_x, mp_y
    start, end = args
    x = mp_x[start:end] # no copy
    y = mp_y[start:end]
    return ((x**2 + y**2) < 1).sum()

def pi_parallel(x, y, npoints):
    # compute pi using multiple processes
    pool = mp.Pool(initializer=mp_init, initargs=[x, y])
    step = 100000
    slices = ((start, start + step) for start in range(0, npoints, step))
    return 4 * sum(pool.imap_unordered(mp_pi, slices)) / npoints

def main():
    npoints = 1000000

    # create shared arrays
    x_sh, y_sh = [mp.RawArray('d', npoints) for _ in range(2)]

    # initialize arrays
    x, y = map(np.frombuffer, [x_sh, y_sh])
    x[:] = np.random.uniform(size=npoints)
    y[:] = np.random.uniform(size=npoints)

    for f, a, b in [(pi_loop, x, y), 
                    (pi_vectorized, x, y), 
                    (pi_parallel, x_sh, y_sh)]:
        pi = f(a, b, npoints)
        precision = int(math.floor(math.log10(npoints)) / 2 - 1 + 0.5)
        print("%.*f %.1e" % (precision + 1, pi, abs(pi - math.pi)))

if __name__=="__main__":
    main()

对于npoints = 10_000_000的时间性能:

pi_loop pi_vectorized pi_parallel 
   32.6         0.159       0.069 # seconds

这表明,主要的性能提升来自于将python循环转换为向量化的numpy版本。

撰写回答