使用numpy加速for循环

2 投票
4 回答
2583 浏览
提问于 2025-04-27 22:28

这个接下来的 for 循环怎么用 numpy 提速呢?我猜这里可以用一些高级的索引技巧,但我不知道具体用哪个(可以用 einsum 吗?)。

a=0
for i in range(len(b)):
    a+=numpy.mean(C[d,e,f+b[i]])*g[i]

补充说明:C 是一个形状大约为 (20, 1600, 500) 的 numpy 三维数组。d,e,f 是一些“有趣”的点的索引(d,e,f 的长度相同,大约是 900)。bg 的长度也相同(大约是 50)。我们要对 C 中所有用索引 d,e,f+b[i] 指向的点取平均值。

暂无标签

4 个回答

0

在结构上,你能期待的唯一速度提升可能来自以下代码:

#Initialize a 4-D array
aggregated = numpy.zeros((len(d), len(e), len(f), len(b)))
#Populate it by the shifted copies of C
for i in range(len(b)):
    aggregated[:, :, :, i] = C[d, e, f + b[i]]

#Compute the mean on the first three axes
means = numpy.mean(aggregated, axis=(0, 1, 2))
#Multiply term-by-term by g (be careful that means and g have the same size!) and sum
a = numpy.sum(means * g)

不过这并不保证计算会更快,实际上可能会更慢,原因有以下几点:

  • 填充这个四维数组的过程是有成本的,因为它涉及到内存的复制
  • b的值非常小,所以你也不会获得太大的提升。如果b的值更大,这样的做法可能会变得有趣,只要d、e、f的值也相应变小

无论如何,你应该对这两种方案进行性能测试。你也可以尝试使用像Cython这样的工具来执行for循环,但这似乎有点过于复杂了。

1

这和循环版本很相似:

np.sum(np.mean(C[d,e,f+b[:,None]], axis=1) * g)

你还可以把求和和乘法结合起来,变成一个点积:

C[d,e,f+b[:,None]].mean(1).dot(g)

不过在时间上,这似乎没什么影响;索引操作是所有操作中耗时最多的(至少在Numpy 1.8.0版本中)。相比之下,原始代码中的循环开销就显得微不足道了。

3

你可以试试下面这个小技巧:

C[d, e][:, np.add.outer(f, b)].dot(g).diagonal().mean()

如果想进一步优化,可以提前拿到那些将要形成对角线的元素:

C[d, e][np.arange(len(d))[:, None], np.add.outer(f, b)].dot(g).mean()
1

时间记录

这两个会话都是用

In [1]: C = np.random.rand(20,1600,500)

In [2]: d = np.random.randint(0, 20, size=900)

In [3]: e = np.random.randint(1600, size=900)

In [4]: f = np.random.randint(400, size=900)

In [5]: b = np.random.randint(100, size=50)

In [6]: g = np.random.rand(50)

Numpy 1.9.0

In [7]: %timeit C[d,e,f + b[:,np.newaxis]].mean(axis=1).dot(g)
1000 loops, best of 3: 942 µs per loop

In [8]: %timeit C[d[:,np.newaxis],e[:, np.newaxis],f[:, np.newaxis] + b].mean(axis=0).dot(g)
1000 loops, best of 3: 762 µs per loop

In [9]: %%timeit                                               
   ...: a = 0
   ...: for i in range(len(b)):                                     
   ...:     a += np.mean(C[d, e, f + b[i]]) * g[i]
   ...: 
100 loops, best of 3: 2.25 ms per loop

In [10]: np.__version__
Out[10]: '1.9.0'

In [11]: %%timeit
(C.ravel()[np.ravel_multi_index((d[:,np.newaxis],
                                 e[:,np.newaxis],
                                 f[:,np.newaxis] + b), dims=C.shape)]
 .mean(axis=0).dot(g))
   ....: 
1000 loops, best of 3: 940 µs per loop

Numpy 1.8.2

In [7]: %timeit C[d,e,f + b[:,np.newaxis]].mean(axis=1).dot(g)
100 loops, best of 3: 2.81 ms per loop

In [8]: %timeit C[d[:,np.newaxis],e[:, np.newaxis],f[:, np.newaxis] + b].mean(axis=0).dot(g)
100 loops, best of 3: 2.7 ms per loop

In [9]: %%timeit                                               
   ...: a = 0
   ...: for i in range(len(b)):                                     
   ...:     a += np.mean(C[d, e, f + b[i]]) * g[i]
   ...: 
100 loops, best of 3: 4.12 ms per loop

In [10]: np.__version__
Out[10]: '1.8.2'

In [51]: %%timeit
(C.ravel()[np.ravel_multi_index((d[:,np.newaxis],
                                 e[:,np.newaxis],
                                 f[:,np.newaxis] + b), dims=C.shape)]
 .mean(axis=0).dot(g))
   ....: 
1000 loops, best of 3: 1.4 ms per loop

描述

你可以用坐标广播的技巧,从一开始就创建一个 50x900 的数组:

In [158]: C[d,e,f + b[:, np.newaxis]].shape
Out[158]: (50, 900)

从那时起,使用 meandot 就能达到目标:

In [159]: C[d,e,f + b[:, np.newaxis]].mean(axis=1).dot(g)
Out[159]: 13.582349962518611

In [160]: 
a = 0
for i in range(len(b)):       
    a += np.mean(C[d, e, f + b[i]]) * g[i]
print(a)
   .....: 
13.5823499625

而且这个方法比用循环的版本快大约 3.3 倍:

In [161]: %timeit C[d,e,f + b[:, np.newaxis]].mean(axis=1).dot(g)
1000 loops, best of 3: 585 µs per loop

In [162]: %%timeit                                               
a = 0
for i in range(len(b)):                                     
    a += np.mean(C[d, e, f + b[i]]) * g[i]
   .....: 
1000 loops, best of 3: 1.95 ms per loop

这个数组的大小很大,所以你需要考虑 CPU 缓存。我不能确切说 np.sum 是怎么遍历这个数组的,但在二维数组中,总是有一种稍微更好的方式(当你选择的下一个元素在内存中是相邻的)和一种稍微差一点的方式(当下一个元素在跨越步幅时找到)。让我们看看在索引时转置数组能否带来更多的好处:

In [196]: C[d[:,np.newaxis], e[:,np.newaxis], f[:,np.newaxis] + b].mean(axis=0).dot(g)
Out[196]: 13.582349962518608

In [197]: %timeit C[d[:,np.newaxis], e[:,np.newaxis], f[:,np.newaxis] + b].mean(axis=0).dot(g)
1000 loops, best of 3: 461 µs per loop

这样比用循环快了 4.2 倍。

撰写回答