一侧填充的两个三维数组卷积过慢

12 投票
3 回答
3785 浏览
提问于 2025-04-17 15:20

在我现在的项目中,我需要以一种稍微不同寻常的方式来“卷积”两个三维数组。

假设我们有两个三维数组 A 和 B,它们的尺寸分别是 dimA 和 dimB(每个轴的尺寸都是一样的)。现在我们想要创建一个第三个数组 C,它的每个轴的尺寸是 dimA 和 dimB 的和。

数组 C 的每个元素是这样计算的:

c_{x1+x2,y1+y2,z1+z2} += a_{x1,y1,z1} * b_{x2,y2,z2}

我现在的版本很简单明了:

dimA = A.shape[0]
dimB = B.shape[0]
dimC = dimA+dimB

C = np.zeros((dimC,dimC,dimC))
for x1 in range(dimA):
    for x2 in range(dimB):
        for y1 in range(dimA):
            for y2 in range(dimB):
                for z1 in range(dimA):
                    for z2 in range(dimB):
                        x = x1+x2
                        y = y1+y2
                        z = z1+z2
                        C[x,y,z] += A[x1,y1,z1] * B[x2,y2,z2] 

可惜,这个版本运行得非常慢,根本无法使用。

我第二个版本是:

C = scipy.signal.fftconvolve(A,B,mode="full")

但是这个版本只计算了 max(dimA,dimB) 的元素。

有没有人有更好的主意呢?

3 个回答

3

上面提到的Numba方法是个不错的技巧,但只对相对较小的N有优势。一个O(N^6)的算法无论实现得多快,都会让你受不了。在我的测试中,fftconvolve方法在N=20时开始变得更快,到N=32时速度是之前的10倍。这里不包括上面提到的custom_convolution的定义:

from timeit import Timer
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import fftconvolve
from numba import jit, double

# Numba'ing the function with the JIT compiler
numba_convolution = jit(double[:, :, :](double[:, :, :],
                        double[:, :, :]))(custom_convolution)

def fft_convolution(A, B):
    return fftconvolve(A, B, mode='full')

if __name__ == '__main__':
    reps = 3
    nt, ft = [], []
    x = range(2, 34)
    for N in x:
        print N
        A = np.random.rand(N, N, N)
        B = np.random.rand(N, N, N)
        C1 = numba_convolution(A, B)
        C2 = fft_convolution(A, B)
        assert np.allclose(C1[:-1, :-1, :-1], C2)
        t = Timer(lambda: numba_convolution(A, B))
        nt.append(t.timeit(number=reps))
        t = Timer(lambda: fft_convolution(A, B))
        ft.append(t.timeit(number=reps))
    plt.plot(x, ft, x, nt)
    plt.show()

这个方法也很依赖于N,因为FFT在2的幂次上会快很多。对于N=17到N=32,FFT版本的时间基本上是恒定的,N=33时仍然更快,但之后又会迅速变慢。

你可以尝试把FFT的实现用Numba包裹起来,但直接用scipy的版本是做不到的。

(抱歉要发一个新回答,但我没有足够的积分直接回复。)

4

一般来说,做事情要用合适的算法,特别是当卷积核相对于数据比较短的时候,这时候就可以使用基于快速傅里叶变换(FFT)的卷积。这里的“短”大概是指长度小于 log2(n),其中 n 是数据的长度。

在你的情况中,因为你要对两个相同大小的数据集进行卷积,所以你可能想考虑使用基于 FFT 的卷积。

很明显,scipy.signal.fftconvolve 在这方面有点不足。通过使用更快的 FFT 算法,你可以自己写一个卷积程序,这样效果会更好(fftconvolve 强制要求变换的大小是2的幂,这也让事情变得复杂)。

下面的代码使用了 pyfftw,这是我对 FFTW 的封装,并创建了一个自定义的卷积类 CustomFFTConvolution

class CustomFFTConvolution(object):

    def __init__(self, A, B, threads=1):

        shape = (np.array(A.shape) + np.array(B.shape))-1

        if np.iscomplexobj(A) and np.iscomplexobj(B):
            self.fft_A_obj = pyfftw.builders.fftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.fftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.ifftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

        else:
            self.fft_A_obj = pyfftw.builders.rfftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.rfftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.irfftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

    def __call__(self, A, B):

        fft_padded_A = self.fft_A_obj(A)
        fft_padded_B = self.fft_B_obj(B)

        return self.ifft_obj(fft_padded_A * fft_padded_B)

使用方法如下:

custom_fft_conv = CustomFFTConvolution(A, B)
C = custom_fft_conv(A, B) # This can contain different values to during construction

在构造这个类的时候,可以选择性地添加一个 threads 参数。创建这个类的目的是为了利用 FFTW 提前规划变换的能力。

下面的完整演示代码只是扩展了 @Kelsey 的回答,增加了计时等功能。

与 numba 解决方案和普通的 fftconvolve 解决方案相比,速度提升非常明显。对于 n = 33 的情况,它的速度大约比这两者快 40-45 倍。

from timeit import Timer
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import fftconvolve
from numba import jit, double
import pyfftw

# Original code
def custom_convolution(A, B):

    dimA = A.shape[0]
    dimB = B.shape[0]
    dimC = dimA + dimB

    C = np.zeros((dimC, dimC, dimC))
    for x1 in range(dimA):
        for x2 in range(dimB):
            for y1 in range(dimA):
                for y2 in range(dimB):
                    for z1 in range(dimA):
                        for z2 in range(dimB):
                            x = x1 + x2
                            y = y1 + y2
                            z = z1 + z2
                            C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2]
    return C

# Numba'ing the function with the JIT compiler
numba_convolution = jit(double[:, :, :](double[:, :, :],
                        double[:, :, :]))(custom_convolution)

def fft_convolution(A, B):
    return fftconvolve(A, B, mode='full')

class CustomFFTConvolution(object):

    def __init__(self, A, B, threads=1):

        shape = (np.array(A.shape) + np.array(B.shape))-1

        if np.iscomplexobj(A) and np.iscomplexobj(B):
            self.fft_A_obj = pyfftw.builders.fftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.fftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.ifftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

        else:
            self.fft_A_obj = pyfftw.builders.rfftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.rfftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.irfftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

    def __call__(self, A, B):

        fft_padded_A = self.fft_A_obj(A)
        fft_padded_B = self.fft_B_obj(B)

        return self.ifft_obj(fft_padded_A * fft_padded_B)

def run_test():
    reps = 10
    nt, ft, cft, cft2 = [], [], [], []
    x = range(2, 34)

    for N in x:
        print N
        A = np.random.rand(N, N, N)
        B = np.random.rand(N, N, N)

        custom_fft_conv = CustomFFTConvolution(A, B)
        custom_fft_conv_nthreads = CustomFFTConvolution(A, B, threads=2)

        C1 = numba_convolution(A, B)
        C2 = fft_convolution(A, B)
        C3 = custom_fft_conv(A, B)
        C4 = custom_fft_conv_nthreads(A, B)

        assert np.allclose(C1[:-1, :-1, :-1], C2)
        assert np.allclose(C1[:-1, :-1, :-1], C3)
        assert np.allclose(C1[:-1, :-1, :-1], C4)

        t = Timer(lambda: numba_convolution(A, B))
        nt.append(t.timeit(number=reps))
        t = Timer(lambda: fft_convolution(A, B))
        ft.append(t.timeit(number=reps))
        t = Timer(lambda: custom_fft_conv(A, B))
        cft.append(t.timeit(number=reps))
        t = Timer(lambda: custom_fft_conv_nthreads(A, B))
        cft2.append(t.timeit(number=reps))

    plt.plot(x, ft, label='scipy.signal.fftconvolve')
    plt.plot(x, nt, label='custom numba convolve')
    plt.plot(x, cft, label='custom pyfftw convolve')
    plt.plot(x, cft2, label='custom pyfftw convolve with threading')        
    plt.legend()
    plt.show()

if __name__ == '__main__':
    run_test()

编辑: 更新后的 scipy 在处理时不总是填充到2的幂长度,所以它的输出更接近于 pyFFTW 的结果。

5

你试过用 Numba 吗?这是一个可以让你加速通常比较慢的 Python 代码的工具,它使用了一种叫做 JIT 编译器的技术。我快速尝试了一下你的问题,发现用 Numba 处理后速度提升很大。通过 IPython 的 timeit 魔法功能,custom_convolution 函数大约花了 18 秒,而用 Numba 优化后的函数只用了 10.4 毫秒。这是一个超过 1700 倍的速度提升

下面是如何使用 Numba 的示例。

import numpy as np
from numba import jit, double

s = 15
array_a = np.random.rand(s ** 3).reshape(s, s, s)
array_b = np.random.rand(s ** 3).reshape(s, s, s)

# Original code
def custom_convolution(A, B):

    dimA = A.shape[0]
    dimB = B.shape[0]
    dimC = dimA + dimB

    C = np.zeros((dimC, dimC, dimC))
    for x1 in range(dimA):
        for x2 in range(dimB):
            for y1 in range(dimA):
                for y2 in range(dimB):
                    for z1 in range(dimA):
                        for z2 in range(dimB):
                            x = x1 + x2
                            y = y1 + y2
                            z = z1 + z2
                            C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2]
    return C

# Numba'ing the function with the JIT compiler
fast_convolution = jit(double[:, :, :](double[:, :, :],
                        double[:, :, :]))(custom_convolution)

如果你计算两个函数结果之间的差异,你会发现结果是零。这意味着 JIT 实现没有任何问题。

slow_result = custom_convolution(array_a, array_b) 
fast_result = fast_convolution(array_a, array_b)

print np.max(np.abs(slow_result - fast_result))

我得到的输出是 0.0

你可以把 Numba 安装到你当前的 Python 环境中,或者通过 AnacondaCE 快速尝试一下。

最后,Numba 的函数比 scipy.signal.fftconvolve 函数快好几倍。

注意:我使用的是 Anaconda,而不是 AnacondaCE。这两个版本在 Numba 的性能上有一些差异,但我觉得差别不会太大。

撰写回答