向量化距离计算的高效方法

2024-04-20 05:14:27 发布

您现在位置:Python中文网/ 问答频道 /正文

在我的研究中,我必须实现两组向量之间的成对距离L1距离计算,每一组向量都表示为NumPy矩阵(向量是行)。这必须使用两个循环、一个循环和无循环来完成。我预计,由于NumPy在向量化方面非常出色,所以算法的排序必须是两个循环慢于一个循环比没有循环慢。在

我写了函数:

def f_cdist_2(X1, X2):
    res = np.zeros(shape=(X1.shape[0], X2.shape[0]), dtype=np.float64)

    for ix1 in range(X1.shape[0]):
        for ix2 in range(X2.shape[0]):
            res[ix1, ix2] = np.abs(X1[ix1, :] - X2[ix2, :]).sum()

    return res


def f_cdist_1(X1, X2):
    res = np.zeros(shape=(X1.shape[0], X2.shape[0]), dtype=np.float64)

    for ix1 in range(X1.shape[0]):
        res[ix1, :] = np.abs(np.tile(X1[ix1, :], (X2.shape[0], 1)) - X2).sum(axis=1)

    return res


def f_cdist_0(X1, X2):
    res = np.abs(
            np.tile(X1[:, :, np.newaxis], (1, 1, X2.shape[0])) - \
            np.tile(X2.T[np.newaxis, :, :], (X1.shape[0], 1, 1))
    ).sum(axis=1)

    return res

然后我用128 x 512和256 x 512形状的两个随机矩阵测试了性能,基于100次运行,我得到了以下结果:

  1. 两个循环:156毫秒

  2. 一个循环:32毫秒

  3. 无循环:135毫秒

我还尝试了cdist中的scipy.spatial.distance,得到了最好的性能:9毫秒。在

现在,有没有更好的方法来实现无循环功能?我希望它的性能至少和一个循环一样好,但现在我不知道如何改进它。在

更新

使用kwinkunks的no-loops实现方法,我在矩阵1024x1024上重新运行了测试(又进行了100次测试),结果如下:

  1. 两个循环:5.7秒

  2. 一个循环:6.6秒

  3. 无循环:3.9秒

  4. scipy.spatial.distance.cdist:0.6秒

所以在更大的矩阵上,无循环实现确实更好。scipy创造了奇迹,但如果我理解正确的话,它是用C编写的,因此性能非常好。在

更新

尝试使用4096 x 1024个矩阵np.float64,相同的设置:

  1. 两个循环:88秒

  2. 一个循环:66秒

  3. 无循环:内存不足(目前有大约18 Gb的可用RAM)

  4. scipy.spatial.distance.cdist:13秒


Tags: infordefnpres矩阵scipy性能
3条回答

您可以使用Pythran从矢量化版本获得额外的加速

f_距离py公司名称:

import numpy as np
#pythran export f_dist(float64[:,:], float64[:,:])
def f_dist(X1, X2):
    return np.sum(np.abs(X1[:, None, :] - X2), axis=-1)

在我的笔记本电脑上,原始版本运行在:

^{pr2}$

编译内核后:

> pythran f_dist.py

您可以对其进行基准测试:

> python -m timeit -s 'from f_dist import f_dist; from numpy.random import random; x = random((100,100)); y = random((100,100))' 'f_dist(x, y)'
1000 loops, best of 3: 1.21 msec per loop

使用SIMD指令可进一步加快计算速度:

> pythran f_dist.py -DUSE_XSIMD -march=native
> python -m timeit -s 'from f_dist import f_dist; from numpy.random import random; x = random((100,100)); y = random((100,100))' 'f_dist(x, y)'
1000 loops, best of 3: 774 usec per loop

免责声明:我是pythran项目的核心开发人员。在

使用Numba的解决方案

  • 并行化(在非常小的示例中,例如(24x24),由于创建线程的开销,并行化版本将较慢)
  • 内环是SIMD矢量化的

Exmaple公司

import numpy as np
import numba as nb

#Debug output for SIMD-vectorization
import llvmlite.binding as llvm
llvm.set_option('', ' debug-only=loop-vectorize')
########################################

#Your solution
#You can also use Numba on this, but apart from parallization
#it is often better to write out the inner loop
def f_cdist(X1, X2):
    res = np.zeros(shape=(X1.shape[0], X2.shape[0]), dtype=np.float64)

    for ix1 in range(X1.shape[0]):
        for ix2 in range(X2.shape[0]):
            res[ix1, ix2] = np.abs(X1[ix1, :] - X2[ix2, :]).sum()

    return res

@nb.njit(fastmath=True,parallel=True)
def f_cdist_nb(X1, X2):
    #Some safety, becuase there is no bounds-checking
    assert X1.shape[1]==X2.shape[1]
    res = np.empty(shape=(X1.shape[0], X2.shape[0]), dtype=X1.dtype)

    for ix1 in nb.prange(X1.shape[0]):
        for ix2 in range(X2.shape[0]):
            #Writing out the inner loop often leads to better performance
            sum=0.
            for i in range(X1.shape[1]):
                sum+=np.abs(X1[ix1, i] - X2[ix2, i])
            res[ix1, ix2] = sum

    return res

性能

^{pr2}$

编辑:手工优化的Numba版本

#Unroll and Jam loops
@nb.njit(fastmath=True,parallel=True)
def f_cdist_nb_3(X1, X2):
    assert X1.shape[1]==X2.shape[1]
    res = np.empty(shape=(X1.shape[0], X2.shape[0]), dtype=X1.dtype)

    for ix1 in nb.prange(X1.shape[0]//4):
        for ix2 in range(X2.shape[0]//4):
            sum_1,sum_2,sum_3,sum_4,sum_5,sum_6   =0.,0.,0.,0.,0.,0.
            sum_7,sum_8,sum_9,sum_10,sum_11,sum_12=0.,0.,0.,0.,0.,0.
            sum_13,sum_14,sum_15,sum_16=0.,0.,0.,0.

            for i in range(X1.shape[1]):
                sum_1+=np.abs(X1[ix1*4+0, i] - X2[ix2*4+0, i])
                sum_2+=np.abs(X1[ix1*4+0, i] - X2[ix2*4+1, i])
                sum_3+=np.abs(X1[ix1*4+0, i] - X2[ix2*4+2, i])
                sum_4+=np.abs(X1[ix1*4+0, i] - X2[ix2*4+3, i])
                sum_5+=np.abs(X1[ix1*4+1, i] - X2[ix2*4+0, i])
                sum_6+=np.abs(X1[ix1*4+1, i] - X2[ix2*4+1, i])
                sum_7+=np.abs(X1[ix1*4+1, i] - X2[ix2*4+2, i])
                sum_8+=np.abs(X1[ix1*4+1, i] - X2[ix2*4+3, i])
                sum_9+=np.abs(X1[ix1*4+2, i] - X2[ix2*4+0, i])
                sum_10+=np.abs(X1[ix1*4+2, i] - X2[ix2*4+1, i])
                sum_11+=np.abs(X1[ix1*4+2, i] - X2[ix2*4+2, i])
                sum_12+=np.abs(X1[ix1*4+2, i] - X2[ix2*4+3, i])
                sum_13+=np.abs(X1[ix1*4+3, i] - X2[ix2*4+0, i])
                sum_14+=np.abs(X1[ix1*4+3, i] - X2[ix2*4+1, i])
                sum_15+=np.abs(X1[ix1*4+3, i] - X2[ix2*4+2, i])
                sum_16+=np.abs(X1[ix1*4+3, i] - X2[ix2*4+3, i])

            res[ix1*4+0, ix2*4+0] = sum_1
            res[ix1*4+0, ix2*4+1] = sum_2
            res[ix1*4+0, ix2*4+2] = sum_3
            res[ix1*4+0, ix2*4+3] = sum_4
            res[ix1*4+1, ix2*4+0] = sum_5
            res[ix1*4+1, ix2*4+1] = sum_6
            res[ix1*4+1, ix2*4+2] = sum_7
            res[ix1*4+1, ix2*4+3] = sum_8
            res[ix1*4+2, ix2*4+0] = sum_9
            res[ix1*4+2, ix2*4+1] = sum_10
            res[ix1*4+2, ix2*4+2] = sum_11
            res[ix1*4+2, ix2*4+3] = sum_12
            res[ix1*4+3, ix2*4+0] = sum_13
            res[ix1*4+3, ix2*4+1] = sum_14
            res[ix1*4+3, ix2*4+2] = sum_15
            res[ix1*4+3, ix2*4+3] = sum_16

    #Rest of the loop
    for ix1 in range(X1.shape[0]//4*4,X1.shape[0]):
        for ix2 in range(X2.shape[0]):
            sum_1=0.
            for i in range(X1.shape[1]):
                sum_1+=np.abs(X1[ix1, i] - X2[ix2, i])
            res[ix1, ix2] = sum_1

    for ix1 in range(X1.shape[0]):
        for ix2 in range(X2.shape[0]//4*4,X2.shape[0]):
            sum_1=0.
            for i in range(X1.shape[1]):
                sum_1+=np.abs(X1[ix1, i] - X2[ix2, i])
            res[ix1, ix2] = sum_1
    return res

计时

#4096x1024    
X1=np.random.rand(4096,1024)
X2=np.random.rand(4096,1024)

res1=f_cdist_nb(X1,X2)
res2=f_cdist_nb_3(X1,X2)
res3=spatial.distance.cdist(X1, X2, 'cityblock')

#Check the results
print(np.allclose(res1,res2))
print(np.allclose(res1,res3))

%timeit res1=f_cdist_nb(X1,X2)
1.6 s ± 199 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res2=f_cdist_nb_3(X1,X2)
497 ms ± 50.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res3=spatial.distance.cdist(X1, X2, 'cityblock')
17.7 s ± 118 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

您可以避免使用NumPy的广播进行平铺等:

def f_dist(X1, X2):
    return np.sum(np.abs(X1[:, None, :] - X2), axis=-1)

但是,令人惊讶的是(不管怎样)它并没有比循环快(我的机器上大约90毫秒,而你的^{cd1>}函数的24毫秒)。

那个广播技巧通常很有用。这意味着你可以做这样的事情:

^{pr2}$

相关问题 更多 >