Python/Numpy 在掩码数组上进行2D核排序滤波的最快方法(和/或选择性排序)
给定一个二维的numpy数组
MyArray = np.array([[ 8.02, 9.54, 0.82, 7.56, 2.26, 9.47],
[ 2.68, 7.3 , 2.74, 3.03, 2.25, 8.84],
[ 2.21, 3.62, 0.55, 2.94, 5.77, 0.21],
[ 5.78, 5.72, 8.85, 0.24, 5.37, 9.9 ],
[ 9.1 , 7.21, 4.14, 9.95, 6.73, 6.08],
[ 1.8 , 5.14, 5.02, 6.52, 0.3 , 6.11]])
还有一个掩码数组
MyMask = np.array([[ 0., 0., 1., 1., 0., 1.],
[ 1., 0., 0., 0., 0., 1.],
[ 0., 0., 0., 1., 0., 0.],
[ 0., 1., 1., 1., 1., 0.],
[ 0., 1., 0., 1., 0., 0.],
[ 0., 1., 0., 0., 1., 1.]])
我想要运行一个“孔洞”中值滤波器,这个滤波器会忽略被掩码的元素。
举个例子,使用一个带有内核的秩滤波器
k = np.array([[ 1, 1, 1],
[ 1, 0, 1],
[ 1, 1, 1]]);
会在MyArray
上运行:对每个MyArray
的元素,按照内核定义的邻域进行排序,然后只返回非掩码元素的中值(如果数组的元素个数是偶数,则取平均)。
目前,我是通过不太优雅的循环来实现这个功能,使用bottleneck.nanmedian,并把掩码映射到NaN。这确实能满足我的需求,但我希望能依赖于二维数组的操作方法。
scipy.signal.order_filter
和scipy.ndimage.filters.rank_filter
都可以使用(rank_filter似乎快得多),但是它们在返回秩的时候,会把NaN
和Inf
排在数组的最前面,从而影响结果。看起来这两种方法都不支持numpy.ma数组(掩码),也不接受选择性秩的数组(这样我可以把所有掩码填充为0并调整我的秩),而且没有明显的方法可以为每个位置改变内核。
我在想我是否错过了某个组合和/或Python的特性,或者我是否应该考虑在Cython中实现一个新的例程。
忽略边界处理,上述问题的内部点将是
[[ 0. 0. 0. 0. 0. 0. ]
[ 0. 3.18 3.62 2.26 2.645 0. ]
[ 0. 2.74 3.325 2.74 2.64 0. ]
[ 0. 3.88 3.62 4.955 6.08 0. ]
[ 0. 5.02 5.77 5.77 6.52 0. ]
[ 0. 0. 0. 0. 0. 0. ]]
1 个回答
4
一种方法是牺牲一些内存使用,来避免使用Python的循环。也就是说,我们把原来的数组扩大,这样就可以一次性对所有的子数组进行过滤。这种做法有点像Numpy的广播机制。
在我测试的情况下,对于一个1000x1000的数组,使用这种向量化的函数大约快了100倍。
在我的代码中,我使用了NaN
来进行掩码处理,但如果多写几行代码,你也可以使用numpy.ma
数组。而且我没有nanmedian
这个函数,所以我用了nanmean
,性能应该是差不多的。
import numpy as np
from numpy.lib.stride_tricks import as_strided
# test data
N = 1000
A = np.random.rand(N, N)*10
mask = np.random.choice([True, False], size=(N, N))
def filter_loop(A, mask):
kernel = np.array([[1,1,1],[1,0,1],[1,1,1]], bool)
A = A.copy()
A[mask] = np.nan
N = A.shape[0] - 2 # assuming square matrix
out = np.empty((N, N))
for i in xrange(N):
for j in xrange(N):
out[i,j] = np.nanmean(A[i:i+3, j:j+3][kernel])
return out
def filter_broadcast(A, mask):
A = A.copy()
A[mask] = np.nan
N = A.shape[0] - 2
B = as_strided(A, (N, N, 3, 3), A.strides+A.strides)
B = B.copy().reshape((N, N, 3*3))
B[:,:,4] = np.nan
return np.nanmean(B, axis=2)