使用numpy加速for循环
这个接下来的 for 循环怎么用 numpy 提速呢?我猜这里可以用一些高级的索引技巧,但我不知道具体用哪个(可以用 einsum 吗?)。
a=0
for i in range(len(b)):
a+=numpy.mean(C[d,e,f+b[i]])*g[i]
补充说明:C
是一个形状大约为 (20, 1600, 500)
的 numpy 三维数组。d,e,f
是一些“有趣”的点的索引(d,e,f
的长度相同,大约是 900)。b
和 g
的长度也相同(大约是 50)。我们要对 C
中所有用索引 d,e,f+b[i]
指向的点取平均值。
4 个回答
0
在结构上,你能期待的唯一速度提升可能来自以下代码:
#Initialize a 4-D array
aggregated = numpy.zeros((len(d), len(e), len(f), len(b)))
#Populate it by the shifted copies of C
for i in range(len(b)):
aggregated[:, :, :, i] = C[d, e, f + b[i]]
#Compute the mean on the first three axes
means = numpy.mean(aggregated, axis=(0, 1, 2))
#Multiply term-by-term by g (be careful that means and g have the same size!) and sum
a = numpy.sum(means * g)
不过这并不保证计算会更快,实际上可能会更慢,原因有以下几点:
- 填充这个四维数组的过程是有成本的,因为它涉及到内存的复制
- b的值非常小,所以你也不会获得太大的提升。如果b的值更大,这样的做法可能会变得有趣,只要d、e、f的值也相应变小
无论如何,你应该对这两种方案进行性能测试。你也可以尝试使用像Cython这样的工具来执行for循环,但这似乎有点过于复杂了。
1
这和循环版本很相似:
np.sum(np.mean(C[d,e,f+b[:,None]], axis=1) * g)
你还可以把求和和乘法结合起来,变成一个点积:
C[d,e,f+b[:,None]].mean(1).dot(g)
不过在时间上,这似乎没什么影响;索引操作是所有操作中耗时最多的(至少在Numpy 1.8.0版本中)。相比之下,原始代码中的循环开销就显得微不足道了。
3
你可以试试下面这个小技巧:
C[d, e][:, np.add.outer(f, b)].dot(g).diagonal().mean()
如果想进一步优化,可以提前拿到那些将要形成对角线的元素:
C[d, e][np.arange(len(d))[:, None], np.add.outer(f, b)].dot(g).mean()
1
时间记录
这两个会话都是用
In [1]: C = np.random.rand(20,1600,500)
In [2]: d = np.random.randint(0, 20, size=900)
In [3]: e = np.random.randint(1600, size=900)
In [4]: f = np.random.randint(400, size=900)
In [5]: b = np.random.randint(100, size=50)
In [6]: g = np.random.rand(50)
Numpy 1.9.0
In [7]: %timeit C[d,e,f + b[:,np.newaxis]].mean(axis=1).dot(g)
1000 loops, best of 3: 942 µs per loop
In [8]: %timeit C[d[:,np.newaxis],e[:, np.newaxis],f[:, np.newaxis] + b].mean(axis=0).dot(g)
1000 loops, best of 3: 762 µs per loop
In [9]: %%timeit
...: a = 0
...: for i in range(len(b)):
...: a += np.mean(C[d, e, f + b[i]]) * g[i]
...:
100 loops, best of 3: 2.25 ms per loop
In [10]: np.__version__
Out[10]: '1.9.0'
In [11]: %%timeit
(C.ravel()[np.ravel_multi_index((d[:,np.newaxis],
e[:,np.newaxis],
f[:,np.newaxis] + b), dims=C.shape)]
.mean(axis=0).dot(g))
....:
1000 loops, best of 3: 940 µs per loop
Numpy 1.8.2
In [7]: %timeit C[d,e,f + b[:,np.newaxis]].mean(axis=1).dot(g)
100 loops, best of 3: 2.81 ms per loop
In [8]: %timeit C[d[:,np.newaxis],e[:, np.newaxis],f[:, np.newaxis] + b].mean(axis=0).dot(g)
100 loops, best of 3: 2.7 ms per loop
In [9]: %%timeit
...: a = 0
...: for i in range(len(b)):
...: a += np.mean(C[d, e, f + b[i]]) * g[i]
...:
100 loops, best of 3: 4.12 ms per loop
In [10]: np.__version__
Out[10]: '1.8.2'
In [51]: %%timeit
(C.ravel()[np.ravel_multi_index((d[:,np.newaxis],
e[:,np.newaxis],
f[:,np.newaxis] + b), dims=C.shape)]
.mean(axis=0).dot(g))
....:
1000 loops, best of 3: 1.4 ms per loop
描述
你可以用坐标广播的技巧,从一开始就创建一个 50x900 的数组:
In [158]: C[d,e,f + b[:, np.newaxis]].shape
Out[158]: (50, 900)
从那时起,使用 mean
和 dot
就能达到目标:
In [159]: C[d,e,f + b[:, np.newaxis]].mean(axis=1).dot(g)
Out[159]: 13.582349962518611
In [160]:
a = 0
for i in range(len(b)):
a += np.mean(C[d, e, f + b[i]]) * g[i]
print(a)
.....:
13.5823499625
而且这个方法比用循环的版本快大约 3.3 倍:
In [161]: %timeit C[d,e,f + b[:, np.newaxis]].mean(axis=1).dot(g)
1000 loops, best of 3: 585 µs per loop
In [162]: %%timeit
a = 0
for i in range(len(b)):
a += np.mean(C[d, e, f + b[i]]) * g[i]
.....:
1000 loops, best of 3: 1.95 ms per loop
这个数组的大小很大,所以你需要考虑 CPU 缓存。我不能确切说 np.sum
是怎么遍历这个数组的,但在二维数组中,总是有一种稍微更好的方式(当你选择的下一个元素在内存中是相邻的)和一种稍微差一点的方式(当下一个元素在跨越步幅时找到)。让我们看看在索引时转置数组能否带来更多的好处:
In [196]: C[d[:,np.newaxis], e[:,np.newaxis], f[:,np.newaxis] + b].mean(axis=0).dot(g)
Out[196]: 13.582349962518608
In [197]: %timeit C[d[:,np.newaxis], e[:,np.newaxis], f[:,np.newaxis] + b].mean(axis=0).dot(g)
1000 loops, best of 3: 461 µs per loop
这样比用循环快了 4.2 倍。