在numpy数组中找到相同值序列的长度(游程编码)
在一个pylab程序中(可能也可以是matlab程序),我有一个numpy数组,里面存的是距离的数据:d[t]
表示在时间t
时的距离(而我的数据总共有len(d)
个时间单位)。
我关心的事件是距离低于某个阈值的情况,我想计算这些事件持续的时间。获取一个布尔数组很简单,方法是用b = d<threshold
,这样就能知道哪些时刻距离低于阈值。但问题在于,我需要计算这个布尔数组中连续为True的部分的长度。我不知道怎么高效地做到这一点(也就是使用numpy的基本功能),于是我只能手动遍历数组,检测变化(也就是当值从False变为True时初始化计数器,值为True时计数器加一,当值变回False时输出计数器的值)。但这样做非常慢。
如何高效地在numpy数组中检测这种序列?
下面是一些python代码,展示了我的问题:第四个点出现的时间非常长(如果没有,试着增加数组的大小)。
from pylab import *
threshold = 7
print '.'
d = 10*rand(10000000)
print '.'
b = d<threshold
print '.'
durations=[]
for i in xrange(len(b)):
if b[i] and (i==0 or not b[i-1]):
counter=1
if i>0 and b[i-1] and b[i]:
counter+=1
if (b[i-1] and not b[i]) or i==len(b)-1:
durations.append(counter)
print '.'
7 个回答
这里有一个只用数组的解决方案:它接受一个包含布尔值(真或假)的数组,并计算出变化的长度。
>>> from numpy import array, arange
>>> b = array([0,0,0,1,1,1,0,0,0,1,1,1,1,0,0], dtype=bool)
>>> sw = (b[:-1] ^ b[1:]); print sw
[False False True False False True False False True False False False
True False]
>>> isw = arange(len(sw))[sw]; print isw
[ 2 5 8 12]
>>> lens = isw[1::2] - isw[::2]; print lens
[3 4]
sw
数组中,真值表示有一个开关,isw
则把这些开关的位置转换成索引。接着,lens
数组中的元素是通过成对相减得到的。
需要注意的是,如果这个序列是以 1 开头的,它会计算 0 的序列长度:这个问题可以通过调整索引来解决,以便正确计算 lens
。另外,我还没有测试一些特殊情况,比如长度为 1 的序列。
这是一个完整的函数,它返回所有 True
子数组的起始位置和长度。
import numpy as np
def count_adjacent_true(arr):
assert len(arr.shape) == 1
assert arr.dtype == np.bool
if arr.size == 0:
return np.empty(0, dtype=int), np.empty(0, dtype=int)
sw = np.insert(arr[1:] ^ arr[:-1], [0, arr.shape[0]-1], values=True)
swi = np.arange(sw.shape[0])[sw]
offset = 0 if arr[0] else 1
lengths = swi[offset+1::2] - swi[offset:-1:2]
return swi[offset:-1:2], lengths
这个函数已经在不同的布尔一维数组上进行了测试(包括空数组;单个或多个元素;偶数或奇数长度;以 True
或 False
开头;以及只有 True
或 False
元素的数组)。
这是一个完全使用numpy库的向量化和通用的RLE(游程编码)方法,可以处理任何类型的数组(包括字符串、布尔值等)。
它的输出是一个包含游程长度、起始位置和对应值的元组。
import numpy as np
def rle(inarray):
""" run length encoding. Partial credit to R rle function.
Multi datatype arrays catered for including non Numpy
returns: tuple (runlengths, startpositions, values) """
ia = np.asarray(inarray) # force numpy
n = len(ia)
if n == 0:
return (None, None, None)
else:
y = ia[1:] != ia[:-1] # pairwise unequal (string safe)
i = np.append(np.where(y), n - 1) # must include last element posi
z = np.diff(np.append(-1, i)) # run lengths
p = np.cumsum(np.append(0, z))[:-1] # positions
return(z, p, ia[i])
运行速度相当快(使用i7处理器):
xx = np.random.randint(0, 5, 1000000)
%timeit yy = rle(xx)
100 loops, best of 3: 18.6 ms per loop
可以处理多种数据类型:
rle([True, True, True, False, True, False, False])
Out[8]:
(array([3, 1, 1, 2]),
array([0, 3, 4, 5]),
array([ True, False, True, False], dtype=bool))
rle(np.array([5, 4, 4, 4, 4, 0, 0]))
Out[9]: (array([1, 4, 2]), array([0, 1, 5]), array([5, 4, 0]))
rle(["hello", "hello", "my", "friend", "okay", "okay", "bye"])
Out[10]:
(array([2, 1, 1, 2, 1]),
array([0, 2, 3, 4, 6]),
array(['hello', 'my', 'friend', 'okay', 'bye'],
dtype='|S6'))
与上面提到的Alex Martelli的结果相同:
xx = np.random.randint(0, 2, 20)
xx
Out[60]: array([1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1])
am = runs_of_ones_array(xx)
tb = rle(xx)
am
Out[63]: array([4, 5, 2, 5])
tb[0][tb[2] == 1]
Out[64]: array([4, 5, 2, 5])
%timeit runs_of_ones_array(xx)
10000 loops, best of 3: 28.5 µs per loop
%timeit rle(xx)
10000 loops, best of 3: 38.2 µs per loop
虽然比Alex的稍微慢一点(但仍然非常快),而且灵活性更高。
虽然这些不是 numpy
的基本功能,但 itertools
的函数通常运行得很快,所以可以试试这个方法(当然,记得对比不同解决方案的运行时间):
def runs_of_ones(bits):
for bit, group in itertools.groupby(bits):
if bit: yield sum(group)
如果你需要把结果放到一个列表里,可以直接用 list(runs_of_ones(bits));不过,使用列表推导式可能会稍微快一点:
def runs_of_ones_list(bits):
return [sum(g) for b, g in itertools.groupby(bits) if b]
接下来,看看“numpy本地”的方法,怎么样呢:
def runs_of_ones_array(bits):
# make sure all runs of ones are well-bounded
bounded = numpy.hstack(([0], bits, [0]))
# get 1 at run starts and -1 at run ends
difs = numpy.diff(bounded)
run_starts, = numpy.where(difs > 0)
run_ends, = numpy.where(difs < 0)
return run_ends - run_starts
再次提醒:一定要在符合你实际情况的例子中对比各个解决方案的性能!