Python/Numpy 在掩码数组上进行2D核排序滤波的最快方法(和/或选择性排序)

8 投票
1 回答
2830 浏览
提问于 2025-04-18 07:23

给定一个二维的numpy数组

MyArray = np.array([[ 8.02,  9.54,  0.82,  7.56,  2.26,  9.47],
           [ 2.68,  7.3 ,  2.74,  3.03,  2.25,  8.84],
           [ 2.21,  3.62,  0.55,  2.94,  5.77,  0.21],
           [ 5.78,  5.72,  8.85,  0.24,  5.37,  9.9 ],
           [ 9.1 ,  7.21,  4.14,  9.95,  6.73,  6.08],
           [ 1.8 ,  5.14,  5.02,  6.52,  0.3 ,  6.11]])

还有一个掩码数组

MyMask =  np.array([[ 0.,  0.,  1.,  1.,  0.,  1.],
            [ 1.,  0.,  0.,  0.,  0.,  1.],
            [ 0.,  0.,  0.,  1.,  0.,  0.],
            [ 0.,  1.,  1.,  1.,  1.,  0.],
            [ 0.,  1.,  0.,  1.,  0.,  0.],
            [ 0.,  1.,  0.,  0.,  1.,  1.]])

我想要运行一个“孔洞”中值滤波器,这个滤波器会忽略被掩码的元素。

举个例子,使用一个带有内核的秩滤波器

k = np.array([[ 1, 1, 1],
              [ 1, 0, 1],
              [ 1, 1, 1]]); 

会在MyArray上运行:对每个MyArray的元素,按照内核定义的邻域进行排序,然后只返回非掩码元素的中值(如果数组的元素个数是偶数,则取平均)。

目前,我是通过不太优雅的循环来实现这个功能,使用bottleneck.nanmedian,并把掩码映射到NaN。这确实能满足我的需求,但我希望能依赖于二维数组的操作方法。

scipy.signal.order_filterscipy.ndimage.filters.rank_filter都可以使用(rank_filter似乎快得多),但是它们在返回秩的时候,会把NaNInf排在数组的最前面,从而影响结果。看起来这两种方法都不支持numpy.ma数组(掩码),也不接受选择性秩的数组(这样我可以把所有掩码填充为0并调整我的秩),而且没有明显的方法可以为每个位置改变内核。

我在想我是否错过了某个组合和/或Python的特性,或者我是否应该考虑在Cython中实现一个新的例程。

忽略边界处理,上述问题的内部点将是

[[ 0.     0.     0.     0.     0.     0.   ]
 [ 0.     3.18   3.62   2.26   2.645  0.   ]
 [ 0.     2.74   3.325  2.74   2.64   0.   ]
 [ 0.     3.88   3.62   4.955  6.08   0.   ]
 [ 0.     5.02   5.77   5.77   6.52   0.   ]
 [ 0.     0.     0.     0.     0.     0.   ]]

1 个回答

4

一种方法是牺牲一些内存使用,来避免使用Python的循环。也就是说,我们把原来的数组扩大,这样就可以一次性对所有的子数组进行过滤。这种做法有点像Numpy的广播机制

在我测试的情况下,对于一个1000x1000的数组,使用这种向量化的函数大约快了100倍。

在我的代码中,我使用了NaN来进行掩码处理,但如果多写几行代码,你也可以使用numpy.ma数组。而且我没有nanmedian这个函数,所以我用了nanmean,性能应该是差不多的。

import numpy as np
from numpy.lib.stride_tricks import as_strided

# test data
N = 1000
A = np.random.rand(N, N)*10
mask = np.random.choice([True, False], size=(N, N))

def filter_loop(A, mask):
    kernel = np.array([[1,1,1],[1,0,1],[1,1,1]], bool)
    A = A.copy()
    A[mask] = np.nan
    N = A.shape[0] - 2  # assuming square matrix
    out = np.empty((N, N))
    for i in xrange(N):
        for j in xrange(N):
            out[i,j] = np.nanmean(A[i:i+3, j:j+3][kernel])
    return out    

def filter_broadcast(A, mask):
    A = A.copy()
    A[mask] = np.nan
    N = A.shape[0] - 2
    B = as_strided(A, (N, N, 3, 3), A.strides+A.strides)
    B = B.copy().reshape((N, N, 3*3))
    B[:,:,4] = np.nan
    return np.nanmean(B, axis=2)

撰写回答