意外缓慢的cython卷积代码
我需要快速计算一个矩阵,这个矩阵的每个元素是通过将一个滤波器和一个向量进行卷积得到的。然后我会对得到的向量进行下采样,最后再和另一个向量做点积。具体来说,我想计算
M = [conv(e_j, f)*P_i*v_i ]_{i,j},
其中 i 从 1 到 n,j 从 1 到 m。这里的 e_j 是一个大小为 n 的指示向量,只有在第 j 列有一个 1,f 是长度为 s 的滤波器,P_i 是一个 (n+s-1)-行 k 列的矩阵,用来从卷积结果中取出合适的 k 个元素,而 v_i 是一个长度为 k 的列向量。
计算 M 的每个元素需要 O(n*s) 次操作,所以计算整个 M 需要 O(n*s*n*m) 次操作。比如当 n=6,m=7,s=3 时,我电脑的一个核心(8GLOPs)应该能在大约 0.094 微秒内计算出 M。然而,我的一个非常简单的 Cython 实现,按照 Cython 文档中的示例,却需要超过 2 毫秒来计算这些参数的示例。这差不多有 4 个数量级的差别!
这里有一个包含 Cython 实现和测试代码的 shar 文件。你可以把它复制粘贴到一个文件中,然后在一个干净的目录下运行 'bash <fname>' 来获取代码,然后运行 'bash ./test.sh' 来看看性能有多糟糕。
cat > fastcalcM.pyx <<'EOF'
import numpy as np
cimport numpy as np
cimport cython
from scipy.signal import convolve
DTYPE=np.float32
ctypedef np.float32_t DTYPE_t
@cython.boundscheck(False)
def calcM(np.ndarray[DTYPE_t, ndim=1, negative_indices=False] filtertaps, int
n, int m, np.ndarray[np.int_t, ndim=2, negative_indices=False]
keep_indices, np.ndarray[DTYPE_t, ndim=2, negative_indices=False] V):
""" Computes a numrows-by-k matrix M whose entries satisfy
M_{i,k} = [conv(e_j, f)^T * P_i * v_i],
where v_i^T is the i-th row of V, and P_i samples the entries from
conv(e_j, f)^T indicated by the ith row of the keep_indices matrix """
cdef int k = keep_indices.shape[1]
cdef np.ndarray M = np.zeros((n, m), dtype=DTYPE)
cdef np.ndarray ej = np.zeros((m,), dtype=DTYPE)
cdef np.ndarray convolution
cdef int rowidx, colidx, kidx
for rowidx in range(n):
for colidx in range(m):
ej[colidx] = 1
convolution = convolve(ej, filtertaps, mode='full')
for kidx in range(k):
M[rowidx, colidx] += convolution[keep_indices[rowidx, kidx]] * V[rowidx, kidx]
ej[colidx] = 0
return M
EOF
#-----------------------------------------------------------------------------
cat > test_calcM.py << 'EOF'
import numpy as np
from fastcalcM import calcM
filtertaps = np.array([-1, 2, -1]).astype(np.float32)
n, m = 6, 7
keep_indices = np.array([[1, 3],
[4, 5],
[2, 2],
[5, 5],
[3, 4],
[4, 5]]).astype(np.int)
V = np.random.random_integers(-5, 5, size=(6, 2)).astype(np.float32)
print calcM(filtertaps, n, m, keep_indices, V)
EOF
#-----------------------------------------------------------------------------
cat > test.sh << 'EOF'
python setup.py build_ext --inplace
echo -e "%run test_calcM\n%timeit calcM(filtertaps, n, m, keep_indices, V)" > script.ipy
ipython script.ipy
EOF
#-----------------------------------------------------------------------------
cat > setup.py << 'EOF'
from distutils.core import setup
from Cython.Build import cythonize
import numpy
setup(
name="Fast convolutions",
include_dirs = [numpy.get_include()],
ext_modules = cythonize("fastcalcM.pyx")
)
EOF
我想也许是调用 scipy 的卷积函数导致了这个问题(我不确定 Cython 和 scipy 是否兼容),所以我按照 Cython 文档中的示例实现了自己的卷积代码,但结果是整体代码变得慢了大约 10 倍。
有没有什么想法可以让我更接近理论上可能的速度,或者为什么差距会这么大?
1 个回答
首先,M
、eg
和convolution
的类型设置让快速索引变得困难。其实你做的这些类型设置并没有太大帮助。
不过这也没关系,因为你有两个额外的开销。第一个是Cython和Python类型之间的转换。如果你想经常把数组传给Python,最好保持无类型的数组,这样就不用每次都转换。把这个直接移到Python里就能加快速度(从1毫秒变成0.65微秒)。
然后我进行了性能分析:
Line # Hits Time Per Hit % Time Line Contents
==============================================================
15 def calcM(filtertaps, n, m, keep_indices, V):
16 4111 3615 0.9 0.1 k = keep_indices.shape[1]
17 4111 8024 2.0 0.1 M = np.zeros((n, m), dtype=np.float32)
18 4111 6090 1.5 0.1 ej = np.zeros((m,), dtype=np.float32)
19
20 28777 18690 0.6 0.3 for rowidx in range(n):
21 197328 123284 0.6 2.2 for colidx in range(m):
22 172662 112348 0.7 2.0 ej[colidx] = 1
23 172662 4076225 23.6 73.6 convolution = convolve(ej, filtertaps, mode='full')
24 517986 395513 0.8 7.1 for kidx in range(k):
25 345324 668309 1.9 12.1 M[rowidx, colidx] += convolution[keep_indices[rowidx, kidx]] * V[rowidx, kidx]
26 172662 120271 0.7 2.2 ej[colidx] = 0
27
28 4111 2374 0.6 0.0 return M
在考虑其他问题之前,先处理convolve
。
为什么convolve
这么慢呢?因为它有很多额外的开销。numpy
/scipy
通常都是这样,适合处理大数据集。如果你知道你的数组会保持小的尺寸,那就直接用Cython重新实现convolve
。
哦,尽量使用缓冲区语法。对于二维数组用DTYPE[:, :]
,对于一维数组用DTYPE[:]
,等等。这是内存视图协议,效果要好得多。虽然在某些情况下它的开销会更大,但通常可以找到解决办法,而且在大多数其他方面都要好得多。
编辑:
你可以尝试(递归地)内联scipy
函数:
import numpy as np
from scipy.signal.sigtools import _correlateND
def calcM(filtertaps, n, m, keep_indices, V):
k = keep_indices.shape[1]
M = np.zeros((n, m), dtype=np.float32)
ej = np.zeros((m,), dtype=np.float32)
slice_obj = [slice(None, None, -1)] * len(filtertaps.shape)
sliced_filtertaps_view = filtertaps[slice_obj]
ps = ej.shape[0] + sliced_filtertaps_view.shape[0] - 1
in1zpadded = np.zeros(ps, ej.dtype)
out = np.empty(ps, ej.dtype)
for rowidx in range(n):
for colidx in range(m):
in1zpadded[colidx] = 1
convolution = _correlateND(in1zpadded, sliced_filtertaps_view, out, 2)
for kidx in range(k):
M[rowidx, colidx] += convolution[keep_indices[rowidx, kidx]] * V[rowidx, kidx]
in1zpadded[colidx] = 0
return M
注意,这使用了私有实现细节。
这个方法是针对特定维度定制的,所以我不确定它是否适用于你的实际数据。但它能去掉绝大部分的开销。然后你可以通过再次设置类型来进一步优化:
import numpy as np
cimport numpy as np
from scipy.signal.sigtools import _correlateND
DTYPE=np.float32
ctypedef np.float32_t DTYPE_t
def calcM(filtertaps, int n, int m, np.int_t[:, :] t_keep_indices, DTYPE_t[:, :] t_V):
cdef int rowidx, colidx, kidx, k
cdef DTYPE_t[:, :] t_M
cdef DTYPE_t[:] t_in1zpadded, t_convolution
k = t_keep_indices.shape[1]
t_M = M = np.zeros((n, m), dtype=np.float32)
ej = np.zeros((m,), dtype=np.float32)
slice_obj = [slice(None, None, -1)] * len(filtertaps.shape)
sliced_filtertaps_view = filtertaps[slice_obj]
ps = ej.shape[0] + sliced_filtertaps_view.shape[0] - 1
t_in1zpadded = in1zpadded = np.zeros(ps, ej.dtype)
out = np.empty(ps, ej.dtype)
for rowidx in range(n):
for colidx in range(m):
t_in1zpadded[colidx] = 1
t_convolution = _correlateND(in1zpadded, sliced_filtertaps_view, out, 2)
for kidx in range(k):
t_M[rowidx, colidx] += t_convolution[<int>t_keep_indices[rowidx, kidx]] * t_V[rowidx, kidx]
t_in1zpadded[colidx] = 0
return M
这样速度提升超过10倍,但还达不到你理想中的估算。不过,那个估算本身就有点不靠谱;)