一侧填充的两个三维数组卷积过慢
在我现在的项目中,我需要以一种稍微不同寻常的方式来“卷积”两个三维数组。
假设我们有两个三维数组 A 和 B,它们的尺寸分别是 dimA 和 dimB(每个轴的尺寸都是一样的)。现在我们想要创建一个第三个数组 C,它的每个轴的尺寸是 dimA 和 dimB 的和。
数组 C 的每个元素是这样计算的:
c_{x1+x2,y1+y2,z1+z2} += a_{x1,y1,z1} * b_{x2,y2,z2}
我现在的版本很简单明了:
dimA = A.shape[0]
dimB = B.shape[0]
dimC = dimA+dimB
C = np.zeros((dimC,dimC,dimC))
for x1 in range(dimA):
for x2 in range(dimB):
for y1 in range(dimA):
for y2 in range(dimB):
for z1 in range(dimA):
for z2 in range(dimB):
x = x1+x2
y = y1+y2
z = z1+z2
C[x,y,z] += A[x1,y1,z1] * B[x2,y2,z2]
可惜,这个版本运行得非常慢,根本无法使用。
我第二个版本是:
C = scipy.signal.fftconvolve(A,B,mode="full")
但是这个版本只计算了 max(dimA,dimB)
的元素。
有没有人有更好的主意呢?
3 个回答
上面提到的Numba方法是个不错的技巧,但只对相对较小的N有优势。一个O(N^6)的算法无论实现得多快,都会让你受不了。在我的测试中,fftconvolve
方法在N=20时开始变得更快,到N=32时速度是之前的10倍。这里不包括上面提到的custom_convolution
的定义:
from timeit import Timer
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import fftconvolve
from numba import jit, double
# Numba'ing the function with the JIT compiler
numba_convolution = jit(double[:, :, :](double[:, :, :],
double[:, :, :]))(custom_convolution)
def fft_convolution(A, B):
return fftconvolve(A, B, mode='full')
if __name__ == '__main__':
reps = 3
nt, ft = [], []
x = range(2, 34)
for N in x:
print N
A = np.random.rand(N, N, N)
B = np.random.rand(N, N, N)
C1 = numba_convolution(A, B)
C2 = fft_convolution(A, B)
assert np.allclose(C1[:-1, :-1, :-1], C2)
t = Timer(lambda: numba_convolution(A, B))
nt.append(t.timeit(number=reps))
t = Timer(lambda: fft_convolution(A, B))
ft.append(t.timeit(number=reps))
plt.plot(x, ft, x, nt)
plt.show()
这个方法也很依赖于N,因为FFT在2的幂次上会快很多。对于N=17到N=32,FFT版本的时间基本上是恒定的,N=33时仍然更快,但之后又会迅速变慢。
你可以尝试把FFT的实现用Numba包裹起来,但直接用scipy的版本是做不到的。
(抱歉要发一个新回答,但我没有足够的积分直接回复。)
一般来说,做事情要用合适的算法,特别是当卷积核相对于数据比较短的时候,这时候就可以使用基于快速傅里叶变换(FFT)的卷积。这里的“短”大概是指长度小于 log2(n),其中 n 是数据的长度。
在你的情况中,因为你要对两个相同大小的数据集进行卷积,所以你可能想考虑使用基于 FFT 的卷积。
很明显,scipy.signal.fftconvolve
在这方面有点不足。通过使用更快的 FFT 算法,你可以自己写一个卷积程序,这样效果会更好(fftconvolve 强制要求变换的大小是2的幂,这也让事情变得复杂)。
下面的代码使用了 pyfftw,这是我对 FFTW 的封装,并创建了一个自定义的卷积类 CustomFFTConvolution
:
class CustomFFTConvolution(object):
def __init__(self, A, B, threads=1):
shape = (np.array(A.shape) + np.array(B.shape))-1
if np.iscomplexobj(A) and np.iscomplexobj(B):
self.fft_A_obj = pyfftw.builders.fftn(
A, s=shape, threads=threads)
self.fft_B_obj = pyfftw.builders.fftn(
B, s=shape, threads=threads)
self.ifft_obj = pyfftw.builders.ifftn(
self.fft_A_obj.get_output_array(), s=shape,
threads=threads)
else:
self.fft_A_obj = pyfftw.builders.rfftn(
A, s=shape, threads=threads)
self.fft_B_obj = pyfftw.builders.rfftn(
B, s=shape, threads=threads)
self.ifft_obj = pyfftw.builders.irfftn(
self.fft_A_obj.get_output_array(), s=shape,
threads=threads)
def __call__(self, A, B):
fft_padded_A = self.fft_A_obj(A)
fft_padded_B = self.fft_B_obj(B)
return self.ifft_obj(fft_padded_A * fft_padded_B)
使用方法如下:
custom_fft_conv = CustomFFTConvolution(A, B)
C = custom_fft_conv(A, B) # This can contain different values to during construction
在构造这个类的时候,可以选择性地添加一个 threads
参数。创建这个类的目的是为了利用 FFTW 提前规划变换的能力。
下面的完整演示代码只是扩展了 @Kelsey 的回答,增加了计时等功能。
与 numba 解决方案和普通的 fftconvolve 解决方案相比,速度提升非常明显。对于 n = 33 的情况,它的速度大约比这两者快 40-45 倍。
from timeit import Timer
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import fftconvolve
from numba import jit, double
import pyfftw
# Original code
def custom_convolution(A, B):
dimA = A.shape[0]
dimB = B.shape[0]
dimC = dimA + dimB
C = np.zeros((dimC, dimC, dimC))
for x1 in range(dimA):
for x2 in range(dimB):
for y1 in range(dimA):
for y2 in range(dimB):
for z1 in range(dimA):
for z2 in range(dimB):
x = x1 + x2
y = y1 + y2
z = z1 + z2
C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2]
return C
# Numba'ing the function with the JIT compiler
numba_convolution = jit(double[:, :, :](double[:, :, :],
double[:, :, :]))(custom_convolution)
def fft_convolution(A, B):
return fftconvolve(A, B, mode='full')
class CustomFFTConvolution(object):
def __init__(self, A, B, threads=1):
shape = (np.array(A.shape) + np.array(B.shape))-1
if np.iscomplexobj(A) and np.iscomplexobj(B):
self.fft_A_obj = pyfftw.builders.fftn(
A, s=shape, threads=threads)
self.fft_B_obj = pyfftw.builders.fftn(
B, s=shape, threads=threads)
self.ifft_obj = pyfftw.builders.ifftn(
self.fft_A_obj.get_output_array(), s=shape,
threads=threads)
else:
self.fft_A_obj = pyfftw.builders.rfftn(
A, s=shape, threads=threads)
self.fft_B_obj = pyfftw.builders.rfftn(
B, s=shape, threads=threads)
self.ifft_obj = pyfftw.builders.irfftn(
self.fft_A_obj.get_output_array(), s=shape,
threads=threads)
def __call__(self, A, B):
fft_padded_A = self.fft_A_obj(A)
fft_padded_B = self.fft_B_obj(B)
return self.ifft_obj(fft_padded_A * fft_padded_B)
def run_test():
reps = 10
nt, ft, cft, cft2 = [], [], [], []
x = range(2, 34)
for N in x:
print N
A = np.random.rand(N, N, N)
B = np.random.rand(N, N, N)
custom_fft_conv = CustomFFTConvolution(A, B)
custom_fft_conv_nthreads = CustomFFTConvolution(A, B, threads=2)
C1 = numba_convolution(A, B)
C2 = fft_convolution(A, B)
C3 = custom_fft_conv(A, B)
C4 = custom_fft_conv_nthreads(A, B)
assert np.allclose(C1[:-1, :-1, :-1], C2)
assert np.allclose(C1[:-1, :-1, :-1], C3)
assert np.allclose(C1[:-1, :-1, :-1], C4)
t = Timer(lambda: numba_convolution(A, B))
nt.append(t.timeit(number=reps))
t = Timer(lambda: fft_convolution(A, B))
ft.append(t.timeit(number=reps))
t = Timer(lambda: custom_fft_conv(A, B))
cft.append(t.timeit(number=reps))
t = Timer(lambda: custom_fft_conv_nthreads(A, B))
cft2.append(t.timeit(number=reps))
plt.plot(x, ft, label='scipy.signal.fftconvolve')
plt.plot(x, nt, label='custom numba convolve')
plt.plot(x, cft, label='custom pyfftw convolve')
plt.plot(x, cft2, label='custom pyfftw convolve with threading')
plt.legend()
plt.show()
if __name__ == '__main__':
run_test()
编辑: 更新后的 scipy 在处理时不总是填充到2的幂长度,所以它的输出更接近于 pyFFTW 的结果。
你试过用 Numba 吗?这是一个可以让你加速通常比较慢的 Python 代码的工具,它使用了一种叫做 JIT 编译器的技术。我快速尝试了一下你的问题,发现用 Numba 处理后速度提升很大。通过 IPython 的 timeit
魔法功能,custom_convolution
函数大约花了 18 秒,而用 Numba 优化后的函数只用了 10.4 毫秒。这是一个超过 1700 倍的速度提升。
下面是如何使用 Numba 的示例。
import numpy as np
from numba import jit, double
s = 15
array_a = np.random.rand(s ** 3).reshape(s, s, s)
array_b = np.random.rand(s ** 3).reshape(s, s, s)
# Original code
def custom_convolution(A, B):
dimA = A.shape[0]
dimB = B.shape[0]
dimC = dimA + dimB
C = np.zeros((dimC, dimC, dimC))
for x1 in range(dimA):
for x2 in range(dimB):
for y1 in range(dimA):
for y2 in range(dimB):
for z1 in range(dimA):
for z2 in range(dimB):
x = x1 + x2
y = y1 + y2
z = z1 + z2
C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2]
return C
# Numba'ing the function with the JIT compiler
fast_convolution = jit(double[:, :, :](double[:, :, :],
double[:, :, :]))(custom_convolution)
如果你计算两个函数结果之间的差异,你会发现结果是零。这意味着 JIT 实现没有任何问题。
slow_result = custom_convolution(array_a, array_b)
fast_result = fast_convolution(array_a, array_b)
print np.max(np.abs(slow_result - fast_result))
我得到的输出是 0.0
。
你可以把 Numba 安装到你当前的 Python 环境中,或者通过 AnacondaCE 快速尝试一下。
最后,Numba 的函数比 scipy.signal.fftconvolve
函数快好几倍。
注意:我使用的是 Anaconda,而不是 AnacondaCE。这两个版本在 Numba 的性能上有一些差异,但我觉得差别不会太大。