使用步幅实现高效移动平均滤波器

30 投票
4 回答
36787 浏览
提问于 2025-04-16 11:28

我最近了解到在 strides 的概念,具体是在 这个帖子 的回答中提到的。我在想,如何能更高效地使用它们来计算移动平均滤波器,而不是我在 这个帖子 中提到的使用卷积滤波器的方法。

这是我目前的做法。它首先获取原始数组的一个视图,然后根据需要的数量进行滚动,并将核值相加以计算平均值。我知道边缘的处理不太正确,但我可以在之后解决这个问题……有没有更好更快的方法呢?我的目标是处理大小达到 5000x5000 x 16 层的大浮点数组,而 scipy.ndimage.filters.convolve 在这方面的速度比较慢。

需要注意的是,我希望使用 8 邻域连接,也就是说,一个 3x3 的滤波器会计算 9 个像素的平均值(焦点像素周围的 8 个像素),并将这个值赋给新图像中的像素。

import numpy, scipy

filtsize = 3
a = numpy.arange(100).reshape((10,10))
b = numpy.lib.stride_tricks.as_strided(a, shape=(a.size,filtsize), strides=(a.itemsize, a.itemsize))
for i in range(0, filtsize-1):
    if i > 0:
        b += numpy.roll(b, -(pow(filtsize,2)+1)*i, 0)
filtered = (numpy.sum(b, 1) / pow(filtsize,2)).reshape((a.shape[0],a.shape[1]))
scipy.misc.imsave("average.jpg", filtered)

编辑:关于我如何看待这个工作的澄清:

当前代码:

  1. 使用 stride_tricks 生成一个数组,比如 [[0,1,2],[1,2,3],[2,3,4]...],这对应于滤波器核的顶部行。
  2. 沿着垂直轴滚动,得到核的中间行 [[10,11,12],[11,12,13],[13,14,15]...],并将其加到我在第一步得到的数组中。
  3. 重复这个过程以获得核的底部行 [[20,21,22],[21,22,23],[22,23,24]...]。此时,我对每一行求和,然后除以滤波器中的元素数量,从而得到每个像素的平均值(偏移了 1 行和 1 列,边缘有些奇怪的情况,但我可以稍后处理)。

我希望能更好地利用 stride_tricks,直接获取整个数组的 9 个值或核元素的总和,或者希望有人能说服我使用其他更高效的方法……

4 个回答

5

让我们来看看:

你的问题不是很清楚,但我现在假设你想要大幅提升这种平均计算的效果。

import numpy as np
from numpy.lib import stride_tricks as st

def mf(A, k_shape= (3, 3)):
    m= A.shape[0]- 2
    n= A.shape[1]- 2
    strides= A.strides+ A.strides
    new_shape= (m, n, k_shape[0], k_shape[1])
    A= st.as_strided(A, shape= new_shape, strides= strides)
    return np.sum(np.sum(A, -1), -1)/ np.prod(k_shape)

if __name__ == '__main__':
    A= np.arange(100).reshape((10, 10))
    print mf(A)

那么,你实际期待什么样的性能提升呢?

更新:
首先,提醒一下:目前这个代码在处理“核”形状时并不太合适。不过这不是我现在最关心的问题(反正已经有了如何正确适应的思路)。

我只是直观地选择了一个4D数组的形状,对我来说,想象一个2D的“核”中心与原始2D数组A的每个网格位置对齐是很有意义的。

但这个4D的形状可能并不是“最佳”的。我认为这里真正的问题在于求和的性能。我们应该能够找到一个“最佳顺序”(对于4D数组),以充分利用你电脑的缓存架构。不过,这个顺序对于“小”数组和“大”数组可能并不相同,小数组可能更容易与电脑的缓存配合,而大数组则不一定(至少不是那么简单)。

更新2:
这里有一个稍微修改过的mf版本。显然,先将其重塑为3D数组,然后进行点积运算会更好(这样做的好处是,核的形状可以是任意的)。不过在我的机器上,这个方法的速度仍然比保罗更新的函数慢了大约3倍。

def mf(A):
    k_shape= (3, 3)
    k= np.prod(k_shape)
    m= A.shape[0]- 2
    n= A.shape[1]- 2
    strides= A.strides* 2
    new_shape= (m, n)+ k_shape
    A= st.as_strided(A, shape= new_shape, strides= strides)
    w= np.ones(k)/ k
    return np.dot(A.reshape((m, n, -1)), w)
8

我对Python不太熟悉,没法写出具体的代码,不过加快卷积运算的两种最佳方法是:分离滤波器和使用傅里叶变换。

分离滤波器:卷积的复杂度是O(M*N),其中M和N分别是图像和滤波器中的像素数量。比如,使用一个3x3的滤波器进行平均滤波,其实可以先用一个3x1的滤波器处理,再用一个1x3的滤波器处理。这样做可以提高大约30%的速度,计算公式是(3+3)/(3*3)(当然,滤波器越大,效果越明显)。当然,你在这里也可以使用一些步幅技巧。

傅里叶变换conv(A,B)等同于ifft(fft(A)*fft(B)),也就是说,直接空间中的卷积变成了傅里叶空间中的乘法,其中A是你的图像,B是你的滤波器。由于傅里叶变换的乘法要求A和B的大小相同,所以B需要是一个和A大小相同的数组,滤波器放在图像的正中心,其他地方填充为零。为了把一个3x3的滤波器放在数组的中心,你可能需要把A的大小调整为奇数。根据你实现傅里叶变换的方式,这种方法可能比直接卷积快得多(如果你多次使用同一个滤波器,还可以提前计算fft(B),这样又能节省大约30%的计算时间)。

28

这里有一些使用“高级”步幅技巧的方法。我本来昨天就想发这个,但被实际工作分心了! :)

@Paul 和 @eat 都有很不错的实现,使用了其他不同的方法。为了继续之前的问题,我想分享一下 N 维数组的对应方法。

不过,对于大于一维的数组,你很难在性能上超越 scipy.ndimage 的函数。(不过 scipy.ndimage.uniform_filter 的性能应该会比 scipy.ndimage.convolve 更好)

而且,如果你想要实现多维的滑动窗口,可能会遇到内存使用量激增的问题,尤其是当你不小心复制了数组时。虽然最初的“滚动”数组只是原始数组内存的一个视图,但任何复制数组的中间步骤都会产生一个比原始数组大得多的副本(比如说,你在处理一个 100x100 的原始数组……对于一个大小为 (3,3) 的滤波器,它的视图会是 98x98x3x3,但使用的内存和原始数组是一样的。然而,任何复制都会使用一个完整的 98x98x3x3 数组所需的内存量!!)

基本上,使用这些复杂的步幅技巧非常适合在 ndarray 的单个轴上进行向量化的滑动窗口操作。这使得计算移动标准差等操作变得非常简单,开销也很小。当你想在多个轴上进行这样的操作时,虽然也是可以的,但通常使用更专业的函数会更好。(比如 scipy.ndimage 等)

无论如何,下面是如何实现的:

import numpy as np

def rolling_window_lastaxis(a, window):
    """Directly taken from Erik Rigtorp's post to numpy-discussion.
    <http://www.mail-archive.com/numpy-discussion@scipy.org/msg29450.html>"""
    if window < 1:
       raise ValueError, "`window` must be at least 1."
    if window > a.shape[-1]:
       raise ValueError, "`window` is too long."
    shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
    strides = a.strides + (a.strides[-1],)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

def rolling_window(a, window):
    if not hasattr(window, '__iter__'):
        return rolling_window_lastaxis(a, window)
    for i, win in enumerate(window):
        if win > 1:
            a = a.swapaxes(i, -1)
            a = rolling_window_lastaxis(a, win)
            a = a.swapaxes(-2, i)
    return a

filtsize = (3, 3)
a = np.zeros((10,10), dtype=np.float)
a[5:7,5] = 1

b = rolling_window(a, filtsize)
blurred = b.mean(axis=-1).mean(axis=-1)

所以,当我们执行 b = rolling_window(a, filtsize) 时,得到的是一个 8x8x3x3 的数组,实际上是原始 10x10 数组的内存视图。我们也可以在不同的轴上使用不同的滤波器大小,或者仅在 N 维数组的选定轴上操作(例如,在一个 4 维数组上使用 filtsize = (0,3,0,3) 会给我们一个 6 维的视图)。

然后,我们可以对最后一个轴反复应用任意函数,以有效地计算滑动窗口中的内容。

然而,由于我们在每一步的 mean(或 std 或其他)中存储的临时数组比原始数组大得多,这样做的内存效率非常低!速度也不会很快。

对于 ndimage,相应的实现是:

blurred = scipy.ndimage.uniform_filter(a, filtsize, output=a)

这将处理各种边界条件,直接在原数组上进行“模糊”处理,而不需要临时复制数组,并且速度会非常快。步幅技巧适合在一个轴上应用函数,但通常不适合在多个轴上进行操作……

这只是我的一点看法……

撰写回答