意外缓慢的cython卷积代码

3 投票
1 回答
1265 浏览
提问于 2025-04-18 06:28

我需要快速计算一个矩阵,这个矩阵的每个元素是通过将一个滤波器和一个向量进行卷积得到的。然后我会对得到的向量进行下采样,最后再和另一个向量做点积。具体来说,我想计算

M = [conv(e_j, f)*P_i*v_i ]_{i,j},

其中 i 从 1 到 n,j 从 1 到 m。这里的 e_j 是一个大小为 n 的指示向量,只有在第 j 列有一个 1,f 是长度为 s 的滤波器,P_i 是一个 (n+s-1)-行 k 列的矩阵,用来从卷积结果中取出合适的 k 个元素,而 v_i 是一个长度为 k 的列向量。

计算 M 的每个元素需要 O(n*s) 次操作,所以计算整个 M 需要 O(n*s*n*m) 次操作。比如当 n=6,m=7,s=3 时,我电脑的一个核心(8GLOPs)应该能在大约 0.094 微秒内计算出 M。然而,我的一个非常简单的 Cython 实现,按照 Cython 文档中的示例,却需要超过 2 毫秒来计算这些参数的示例。这差不多有 4 个数量级的差别!

这里有一个包含 Cython 实现和测试代码的 shar 文件。你可以把它复制粘贴到一个文件中,然后在一个干净的目录下运行 'bash <fname>' 来获取代码,然后运行 'bash ./test.sh' 来看看性能有多糟糕。

cat > fastcalcM.pyx <<'EOF'

import numpy as np
cimport numpy as np
cimport cython
from scipy.signal import convolve

DTYPE=np.float32
ctypedef np.float32_t DTYPE_t

@cython.boundscheck(False)
def calcM(np.ndarray[DTYPE_t, ndim=1, negative_indices=False] filtertaps, int
        n, int m, np.ndarray[np.int_t, ndim=2, negative_indices=False]
        keep_indices, np.ndarray[DTYPE_t, ndim=2, negative_indices=False] V):
    """ Computes a numrows-by-k matrix M whose entries satisfy
        M_{i,k} = [conv(e_j, f)^T * P_i * v_i],
        where v_i^T is the i-th row of V, and P_i samples the entries from
        conv(e_j, f)^T indicated by the ith row of the keep_indices matrix """

    cdef int k = keep_indices.shape[1]

    cdef np.ndarray M = np.zeros((n, m), dtype=DTYPE)
    cdef np.ndarray ej = np.zeros((m,), dtype=DTYPE)
    cdef np.ndarray convolution
    cdef int rowidx, colidx, kidx

    for rowidx in range(n):
        for colidx in range(m):
            ej[colidx] = 1
            convolution = convolve(ej, filtertaps, mode='full')
            for kidx in range(k):
                M[rowidx, colidx] += convolution[keep_indices[rowidx, kidx]] * V[rowidx, kidx]
            ej[colidx] = 0

    return M

EOF
#-----------------------------------------------------------------------------
cat > test_calcM.py << 'EOF'

import numpy as np
from fastcalcM import calcM

filtertaps = np.array([-1, 2, -1]).astype(np.float32)
n, m = 6, 7
keep_indices = np.array([[1, 3], 
                         [4, 5],
                         [2, 2], 
                         [5, 5], 
                         [3, 4], 
                         [4, 5]]).astype(np.int)
V = np.random.random_integers(-5, 5, size=(6, 2)).astype(np.float32)

print calcM(filtertaps, n, m, keep_indices, V)

EOF
#-----------------------------------------------------------------------------
cat > test.sh << 'EOF'

python setup.py build_ext --inplace
echo -e "%run test_calcM\n%timeit calcM(filtertaps, n, m, keep_indices, V)" > script.ipy
ipython script.ipy

EOF
#-----------------------------------------------------------------------------
cat > setup.py << 'EOF'

from distutils.core import setup
from Cython.Build import cythonize
import numpy

setup(
    name="Fast convolutions",
    include_dirs = [numpy.get_include()],
    ext_modules = cythonize("fastcalcM.pyx")
)

EOF

我想也许是调用 scipy 的卷积函数导致了这个问题(我不确定 Cython 和 scipy 是否兼容),所以我按照 Cython 文档中的示例实现了自己的卷积代码,但结果是整体代码变得慢了大约 10 倍。

有没有什么想法可以让我更接近理论上可能的速度,或者为什么差距会这么大?

1 个回答

4

首先,Megconvolution的类型设置让快速索引变得困难。其实你做的这些类型设置并没有太大帮助。

不过这也没关系,因为你有两个额外的开销。第一个是Cython和Python类型之间的转换。如果你想经常把数组传给Python,最好保持无类型的数组,这样就不用每次都转换。把这个直接移到Python里就能加快速度(从1毫秒变成0.65微秒)。

然后我进行了性能分析:

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    15                                           def calcM(filtertaps, n, m, keep_indices, V):
    16      4111         3615      0.9      0.1      k = keep_indices.shape[1]
    17      4111         8024      2.0      0.1      M = np.zeros((n, m), dtype=np.float32)
    18      4111         6090      1.5      0.1      ej = np.zeros((m,), dtype=np.float32)
    19                                           
    20     28777        18690      0.6      0.3      for rowidx in range(n):
    21    197328       123284      0.6      2.2          for colidx in range(m):
    22    172662       112348      0.7      2.0              ej[colidx] = 1
    23    172662      4076225     23.6     73.6              convolution = convolve(ej, filtertaps, mode='full')
    24    517986       395513      0.8      7.1              for kidx in range(k):
    25    345324       668309      1.9     12.1                  M[rowidx, colidx] += convolution[keep_indices[rowidx, kidx]] * V[rowidx, kidx]
    26    172662       120271      0.7      2.2              ej[colidx] = 0
    27                                           
    28      4111         2374      0.6      0.0      return M

在考虑其他问题之前,先处理convolve

为什么convolve这么慢呢?因为它有很多额外的开销。numpy/scipy通常都是这样,适合处理大数据集。如果你知道你的数组会保持小的尺寸,那就直接用Cython重新实现convolve

哦,尽量使用缓冲区语法。对于二维数组用DTYPE[:, :],对于一维数组用DTYPE[:],等等。这是内存视图协议,效果要好得多。虽然在某些情况下它的开销会更大,但通常可以找到解决办法,而且在大多数其他方面都要好得多。


编辑:

你可以尝试(递归地)内联scipy函数:

import numpy as np
from scipy.signal.sigtools import _correlateND

def calcM(filtertaps, n, m, keep_indices, V):
    k = keep_indices.shape[1]
    M = np.zeros((n, m), dtype=np.float32)
    ej = np.zeros((m,), dtype=np.float32)

    slice_obj = [slice(None, None, -1)] * len(filtertaps.shape)
    sliced_filtertaps_view = filtertaps[slice_obj]

    ps = ej.shape[0] + sliced_filtertaps_view.shape[0] - 1
    in1zpadded = np.zeros(ps, ej.dtype)
    out = np.empty(ps, ej.dtype)

    for rowidx in range(n):
        for colidx in range(m):
            in1zpadded[colidx] = 1

            convolution = _correlateND(in1zpadded, sliced_filtertaps_view, out, 2)

            for kidx in range(k):
                M[rowidx, colidx] += convolution[keep_indices[rowidx, kidx]] * V[rowidx, kidx]

            in1zpadded[colidx] = 0

    return M

注意,这使用了私有实现细节。

这个方法是针对特定维度定制的,所以我不确定它是否适用于你的实际数据。但它能去掉绝大部分的开销。然后你可以通过再次设置类型来进一步优化:

import numpy as np
cimport numpy as np
from scipy.signal.sigtools import _correlateND

DTYPE=np.float32
ctypedef np.float32_t DTYPE_t

def calcM(filtertaps, int n, int m, np.int_t[:, :] t_keep_indices, DTYPE_t[:, :] t_V):
    cdef int rowidx, colidx, kidx, k
    cdef DTYPE_t[:, :] t_M
    cdef DTYPE_t[:] t_in1zpadded, t_convolution

    k = t_keep_indices.shape[1]
    t_M = M = np.zeros((n, m), dtype=np.float32)
    ej = np.zeros((m,), dtype=np.float32)

    slice_obj = [slice(None, None, -1)] * len(filtertaps.shape)
    sliced_filtertaps_view = filtertaps[slice_obj]

    ps = ej.shape[0] + sliced_filtertaps_view.shape[0] - 1
    t_in1zpadded = in1zpadded = np.zeros(ps, ej.dtype)
    out = np.empty(ps, ej.dtype)

    for rowidx in range(n):
        for colidx in range(m):
            t_in1zpadded[colidx] = 1

            t_convolution = _correlateND(in1zpadded, sliced_filtertaps_view, out, 2)

            for kidx in range(k):
                t_M[rowidx, colidx] += t_convolution[<int>t_keep_indices[rowidx, kidx]] * t_V[rowidx, kidx]

            t_in1zpadded[colidx] = 0

    return M

这样速度提升超过10倍,但还达不到你理想中的估算。不过,那个估算本身就有点不靠谱;)

撰写回答