用纯NumPy重写for循环以减少执行时间
我最近在讨论如何优化一个Python循环,用于科学应用。有人给我提供了一个很棒的建议,使用NumPy重写代码,这样执行时间减少了大约100倍!
不过,计算值实际上是嵌套在几个其他循环中的,因为它是在一个规则的位置网格上进行评估的。有没有类似的聪明的NumPy重写方法,可以让这个过程更快呢?
我觉得这个部分的性能提升可能不会那么明显,而且可能会有一些缺点,比如无法实时向用户报告计算进度,结果在计算结束前无法写入输出文件,还有可能在一次性处理这么大的数据时会有内存问题。有没有办法解决这些问题呢?
import numpy as np
import time
def reshape_vector(v):
b = np.empty((3,1))
for i in range(3):
b[i][0] = v[i]
return b
def unit_vectors(r):
return r / np.sqrt((r*r).sum(0))
def calculate_dipole(mu, r_i, mom_i):
relative = mu - r_i
r_unit = unit_vectors(relative)
A = 1e-7
num = A*(3*np.sum(mom_i*r_unit, 0)*r_unit - mom_i)
den = np.sqrt(np.sum(relative*relative, 0))**3
B = np.sum(num/den, 1)
return B
N = 20000 # number of dipoles
r_i = np.random.random((3,N)) # positions of dipoles
mom_i = np.random.random((3,N)) # moments of dipoles
a = np.random.random((3,3)) # three basis vectors for this crystal
n = [10,10,10] # points at which to evaluate sum
gamma_mu = 135.5 # a constant
t_start = time.clock()
for i in range(n[0]):
r_frac_x = np.float(i)/np.float(n[0])
r_test_x = r_frac_x * a[0]
for j in range(n[1]):
r_frac_y = np.float(j)/np.float(n[1])
r_test_y = r_frac_y * a[1]
for k in range(n[2]):
r_frac_z = np.float(k)/np.float(n[2])
r_test = r_test_x +r_test_y + r_frac_z * a[2]
r_test_fast = reshape_vector(r_test)
B = calculate_dipole(r_test_fast, r_i, mom_i)
omega = gamma_mu*np.sqrt(np.dot(B,B))
# write r_test, B and omega to a file
frac_done = np.float(i+1)/(n[0]+1)
t_elapsed = (time.clock()-t_start)
t_remain = (1-frac_done)*t_elapsed/frac_done
print frac_done*100,'% done in',t_elapsed/60.,'minutes...approximately',t_remain/60.,'minutes remaining'
2 个回答
你可以做的一个明显的事情是把这一行
r_test_fast = reshape_vector(r_test)
替换成
r_test_fast = r_test.reshape((3,1))
这样做可能不会对性能产生太大的影响,但无论如何,使用numpy自带的功能总比自己重新发明轮子要好。
一般来说,正如你可能已经注意到的,优化numpy的关键在于使用numpy的整体数组操作来表达算法,或者至少使用切片,而不是在Python代码中逐个遍历每个元素。阻止这种“向量化”的情况通常是所谓的循环依赖,也就是说,循环中的每次迭代都依赖于前一次迭代的结果。简单看一下你的代码,你并没有这种情况,所以应该可以很好地进行向量化。
编辑:一个解决方案
我还没有验证这个是否正确,但这应该能给你一个思路,告诉你该怎么做。
首先,使用这个cartesian()函数,我们将用到它。然后
def calculate_dipole_vect(mus, r_i, mom_i):
# Treat each mu sequentially
Bs = []
omega = []
for mu in mus:
rel = mu - r_i
r_norm = np.sqrt((rel * rel).sum(1))
r_unit = rel / r_norm[:, np.newaxis]
A = 1e-7
num = A*(3*np.sum(mom_i * r_unit, 0)*r_unit - mom_i)
den = r_norm ** 3
B = np.sum(num / den[:, np.newaxis], 0)
Bs.append(B)
omega.append(gamma_mu * np.sqrt(np.dot(B, B)))
return Bs, omega
# Transpose to get more "natural" ordering with row-major numpy
r_i = r_i.T
mom_i = mom_i.T
t_start = time.clock()
r_frac = cartesian((np.arange(n[0]) / float(n[0]),
np.arange(n[1]) / float(n[1]),
np.arange(n[2]) / float(n[2])))
r_test = np.dot(r_frac, a)
B, omega = calculate_dipole_vect(r_test, r_i, mom_i)
print 'Total time for vectorized: %f s' % (time.clock() - t_start)
在我的测试中,这实际上比我最开始的基于循环的方法稍慢。问题是,在问题的原始版本中,它已经通过对形状为(20000, 3)的数组进行整体数组操作进行了向量化,所以进一步的向量化并没有带来太多好处。实际上,可能会导致性能下降,可能是因为生成了很大的临时数组。
如果你对你的代码进行性能分析,你会发现99%的运行时间都花在了calculate_dipole
这个函数上。所以,减少其他循环的时间其实对整体执行时间的影响不大。如果你想让程序运行得更快,还是得把重点放在calculate_dipole
上。我试过用Cython来优化calculate_dipole
,结果整体运行时间减少了大约一半。可能还有其他方法可以进一步改善Cython代码。