用Numpy和Cython加速距离矩阵计算

9 投票
1 回答
3462 浏览
提问于 2025-04-18 16:37

考虑一个维度为 NxM 的 numpy 数组 A。我们的目标是计算一个欧几里得距离矩阵 D,其中每个元素 D[i,j] 表示第 i 行和第 j 行之间的欧几里得距离。有没有什么快速的方法可以做到这一点?虽然这不是我需要解决的具体问题,但它很好地展示了我想做的事情(一般来说,也可以使用其他距离度量)。

这是我目前想到的最快的方法:

n = A.shape[0]
D = np.empty((n,n))
for i in range(n):
    D[i] = np.sqrt(np.square(A-A[i]).sum(1))

但这真的是最快的方法吗?我主要担心的是 for 循环。我们能不能用 Cython 来提高速度呢?

为了避免循环,我尝试使用广播,做了类似这样的操作:

D = np.sqrt(np.square(A[np.newaxis,:,:]-A[:,np.newaxis,:]).sum(2))

但结果证明这是个坏主意,因为在构建一个维度为 NxNxM 的中间 3D 数组时会有一些额外的开销,所以性能反而更差。

我尝试了 Cython,但我对 Cython 还是个新手,所以不太确定我的尝试效果如何:

def dist(np.ndarray[np.int32_t, ndim=2] A):
    cdef int n = A.shape[0]    
    cdef np.ndarray[np.float64_t, ndim=2] dm = np.empty((n,n), dtype=np.float64)      
    cdef int i = 0    
    for i in range(n):  
        dm[i] = np.sqrt(np.square(A-A[i]).sum(1)).astype(np.float64)              
    return dm 

上面的代码比 Python 的 for 循环还要慢。我对 Cython 了解不多,但我认为我至少可以达到和 for 循环 + numpy 一样的性能。我在想,是否有可能通过正确的方法实现明显的性能提升?或者有没有其他方法可以加速这个过程(不涉及并行计算)?

1 个回答

10

Cython的关键在于尽量避免使用Python对象和函数调用,包括对numpy数组的向量化操作。这通常意味着需要手动写出所有的循环,并一次处理一个数组元素。

这里有一个非常有用的教程,讲解了如何将numpy代码转换为Cython并进行优化。

下面是一个更优化的Cython版本的距离函数:

import numpy as np
cimport numpy as np
cimport cython

# don't use np.sqrt - the sqrt function from the C standard library is much
# faster
from libc.math cimport sqrt

# disable checks that ensure that array indices don't go out of bounds. this is
# faster, but you'll get a segfault if you mess up your indexing.
@cython.boundscheck(False)
# this disables 'wraparound' indexing from the end of the array using negative
# indices.
@cython.wraparound(False)
def dist(double [:, :] A):

    # declare C types for as many of our variables as possible. note that we
    # don't necessarily need to assign a value to them at declaration time.
    cdef:
        # Py_ssize_t is just a special platform-specific type for indices
        Py_ssize_t nrow = A.shape[0]
        Py_ssize_t ncol = A.shape[1]
        Py_ssize_t ii, jj, kk

        # this line is particularly expensive, since creating a numpy array
        # involves unavoidable Python API overhead
        np.ndarray[np.float64_t, ndim=2] D = np.zeros((nrow, nrow), np.double)

        double tmpss, diff

    # another advantage of using Cython rather than broadcasting is that we can
    # exploit the symmetry of D by only looping over its upper triangle
    for ii in range(nrow):
        for jj in range(ii + 1, nrow):
            # we use tmpss to accumulate the SSD over each pair of rows
            tmpss = 0
            for kk in range(ncol):
                diff = A[ii, kk] - A[jj, kk]
                tmpss += diff * diff
            tmpss = sqrt(tmpss)
            D[ii, jj] = tmpss
            D[jj, ii] = tmpss  # because D is symmetric

    return D

我把这个保存到了一个叫做fastdist.pyx的文件里。我们可以用pyximport来简化构建过程:

import pyximport
pyximport.install()
import fastdist
import numpy as np

A = np.random.randn(100, 200)

D1 = np.sqrt(np.square(A[np.newaxis,:,:]-A[:,np.newaxis,:]).sum(2))
D2 = fastdist.dist(A)

print np.allclose(D1, D2)
# True

这样就能运行了,至少是这样。接下来我们用%timeit这个魔法命令来做一些基准测试:

%timeit np.sqrt(np.square(A[np.newaxis,:,:]-A[:,np.newaxis,:]).sum(2))
# 100 loops, best of 3: 10.6 ms per loop

%timeit fastdist.dist(A)
# 100 loops, best of 3: 1.21 ms per loop

大约9倍的速度提升不错,但也不是特别惊人。不过,正如你所说,广播方法的一个大问题是构建中间数组所需的内存。

A2 = np.random.randn(1000, 2000)
%timeit fastdist.dist(A2)
# 1 loops, best of 3: 1.36 s per loop

我不建议使用广播来尝试这个...

我们还可以做的另一件事是对最外层的循环进行并行处理,使用prange函数:

from cython.parallel cimport prange

...

for ii in prange(nrow, nogil=True, schedule='guided'):
...

为了编译这个并行版本,你需要告诉编译器启用OpenMP。我还没弄明白如何用pyximport做到这一点,但如果你使用gcc,可以手动这样编译:

$ cython fastdist.pyx
$ gcc -shared -pthread -fPIC -fwrapv -fopenmp -O3 \
   -Wall -fno-strict-aliasing  -I/usr/include/python2.7 -o fastdist.so fastdist.c

使用8个线程进行并行处理:

%timeit D2 = fastdist.dist_parallel(A2)
1 loops, best of 3: 509 ms per loop

撰写回答