Numpy 向量化算法找到比当前元素大的第一个未来元素

4 投票
2 回答
1128 浏览
提问于 2025-04-17 05:00

我有一个时间序列A。我想生成另一个时间序列B,要求:

B[i] = j,其中j是第一个比i大的索引,使得A[j]大于A[i]。

有没有什么快速的方法可以在numpy中实现这个?

谢谢。

[编辑过]: 最好只使用O(n)的空间。

2 个回答

2

@Winston Ewert 的 future8 方法是 O(n) 的复杂度(!),比我们之前讨论的所有方案都要好。要证明它是 O(n),可以观察到内部的 while 循环对于任何值的 B[target] 最多只会执行一次。

我之前的回答:

这里有三种方法的性能对比(这是我和 @Winston Ewert 之间的讨论结果):

  1. 使用二分查找的升序列表。(future2)
  2. 完整列表(future6,由 @Winston Ewert 提供)
  3. numpy.vectorize(future7,是 @Winston Ewert 的 future5 的增强版)。

在不同情况下,这三种方法的速度差异很大。如果数据是随机的,那么“完整列表”(future6)是最快的。如果数据有波动,那么“升序列表”(future2)是最快的。如果数据有上升趋势,那么“vectorize”(future7)是最快的。

如果数据是股票报价,我会选择“vectorize”(future7),因为股票通常有上升的趋势,而且这种方法简单,在各种情况下表现都不错。

输出:

Random series:
future2 ascends  : 0.210215095646
future6 full list: 0.0920153693145
future7 vectorize: 0.138747922771
Oscillating series:
future2 ascends  : 0.208349650159
future6 full list: 0.940276050999
future7 vectorize: 0.597290143496
Ascending trend series:
future2 ascends  : 0.131106233627
future6 full list: 20.7201531342
future7 vectorize: 0.0540951244451

代码:

import numpy as np
import time 
import timeit

def future2(A):    
    def reverse_enum(L):
        for index in reversed(xrange(len(L))):
            yield len(L)-index-1, L[index]
    def findnext(x, A, ascends): # find index of first future number greater than x
        for idx, segment in reverse_enum(ascends):
            joff=A[segment[0]:segment[1]+1].searchsorted(x,side='right') # binary search
            if joff < (segment[1]-segment[0]+1):
                j=segment[0]+joff
                [ascends.pop() for _ in range(idx)] # delete previous segments
                segment[0]=j # cut beginning of segment 
                return j
        return -1
    B = np.arange(len(A))+1
    # Note: B[i]=-1 where there is no greater value in the future.
    B[-1] = -1 # put -1 at the end
    ascends = [] # a list of pairs of indexes, ascending segments of A
    maximum = True
    for i in xrange(len(A)-2,-1,-1): # scan backwards
        #print(ascends)
        if A[i] < A[i+1]:
            if maximum:
                ascends.append([i+1,i+1])
                maximum = False
            else:
                ascends[-1][0] = i+1
        else:# A[i] >= A[i+1]
            B[i] = findnext(A[i], A, ascends)
            maximum = True
    return B


def future6(A):
    # list of tuples (index into A, value in A)
    # invariant: indexes and values in sorted order
    known = []
    result = []
    for idx in xrange(len(A) - 1, -1, -1):
        value = A[idx]
        # since known is sorted a binary search could be applied here
        # I haven't bothered

        # anything lower then the current value
        # cannot possibly be used again, since this value will be first index instead
        # of those
        known = [(x,y) for x,y in known if y > value]


        if known: 
            # all values in known are > current value
            # they reverse sorted by index               
            # the value with the lowest index is first
            result.append(known[-1][0])
        else:
            # no values exist this high, report -1
            result.append(-1)
        # add to end of the list to maintain invariant
        known.append( (idx, value) )

    # let numpy worry about reversing the array
    return np.array(result)[::-1]


def future7(A):
    @np.vectorize
    def values(i):
        for idx, v in enumerate(A[i+1:]): # loop is faster than genexp with exception
            if A[i]<v:
                return idx+i+1
        return -1
    return values(np.arange(len(A)))

if __name__ == '__main__':
    print('Random series:')
    tsetup = """import future; import numpy; A = numpy.random.random(1e4)"""
    t = timeit.timeit('future.future2(A)', tsetup, number=3)
    print('future2 ascends  : '+str(t))
    t = timeit.timeit('future.future6(A)', tsetup, number=3)
    print('future6 full list: '+str(t))
    t = timeit.timeit('future.future7(A)', tsetup, number=3)
    print('future7 vectorize: '+str(t))

    print('Oscillating series:')
    tsetup = """import future; import numpy; A = numpy.random.randint(1e5,size=1e4)-5e4; A = A.cumsum()"""
    t = timeit.timeit('future.future2(A)', tsetup, number=3)
    print('future2 ascends  : '+str(t))
    t = timeit.timeit('future.future6(A)', tsetup, number=3)
    print('future6 full list: '+str(t))
    t = timeit.timeit('future.future7(A)', tsetup, number=3)
    print('future7 vectorize: '+str(t))

    print('Ascending trend series:')
    tsetup = """import future; import numpy; A = numpy.random.randint(1e5,size=1e4)-3e4; A = A.cumsum()"""
    t = timeit.timeit('future.future2(A)', tsetup, number=3)
    print('future2 ascends  : '+str(t))
    t = timeit.timeit('future.future6(A)', tsetup, number=3)
    print('future6 full list: '+str(t))
    t = timeit.timeit('future.future7(A)', tsetup, number=3)
    print('future7 vectorize: '+str(t))
9

测试不够充分,使用需谨慎。

import numpy

a = numpy.random.random(100)

# a_by_a[i,j] = a[i] > a[j]
a_by_a = a[numpy.newaxis,:] > a[:,numpy.newaxis]
# by taking the upper triangular, we ignore all cases where i < j
a_by_a = numpy.triu(a_by_a)
# argmax will give the first index with the highest value (1 in this case)
print numpy.argmax(a_by_a, axis = 1)

内存占用较低的版本:

a = numpy.random.random(100)

@numpy.vectorize
def values(i):
    try:
        return (a[i:] > a[i]).nonzero()[0][0] + i
    except IndexError:
        return -1 # no valid values found

b = values(numpy.arange(100))

更快的版本:

@np.vectorize
def values(i):
    try:
        return next(idx for idx, value in enumerate(A[i+1:]) if value > A[i]) + i + 1
    except StopIteration:
        return -1 # no valid values found
return values(np.arange(len(A)))

更更快的版本:

def future6(A):
    # list of tuples (index into A, value in A)
    # invariant: indexes and values in sorted order
    known = []
    result = []
    for idx in xrange(len(A) - 1, -1, -1):
        value = A[idx]
        # since known is sorted a binary search could be applied here
        # I haven't bothered

        # anything lower then the current value
        # cannot possibly be used again, since this value will be first index instead
        # of those
        known = [(x,y) for x,y in known if y > value]


        if known: 
            # all values in known are > current value
            # they reverse sorted by index               
            # the value with the lowest index is first
            result.append(known[-1][0])
        else:
            # no values exist this high, report -1
            result.append(-1)
        # add to end of the list to maintain invariant
        known.append( (idx, value) )

    # let numpy worry about reversing the array
    return np.array(result)[::-1]

感谢cyborg提供的一些思路。

算法差异

cyborg展示了不同算法在处理不同数据时的显著差异。我收集了一些运行这些算法的数据,想看看发生了什么。

随机数据:

Average distance between value and its target: 9
Average length of ascends list: 24
Average length of segment in ascends list: 1.33
Average length of known list: 9.1

由于列表很短,升序算法大多数情况下退化为线性搜索。它确实能清除掉未来无法使用的升序部分,所以比线性搜索要好一些。

震荡数据:

Average distance between value and its target: 31.46
Average length of ascends list: 84
Average length of segment in ascends list: 1.70
Average length of known list: 57.98

震荡的数据往往会让不同的部分分得更远。这自然会影响线性搜索算法。两个“更聪明”的算法需要跟踪额外的数据。我的算法每次扫描数据时性能下降得很厉害,而升序算法接触的数据较少,表现得更好。

升序数据:

Average distance between value and its target: 2.57
Average length of ascends list: 40
Average length of segment in ascends list: 3.27
Average length of known list: 3037.97

我的算法出现问题是显而易见的,因为它需要跟踪大量的升序值。目标值和实际值之间的短距离解释了线性搜索的良好表现。升序算法在处理非常长的段落时仍然不太有效。

更好的算法

我的算法没有必要对数据进行线性搜索。数据是有序的,我们只需要从列表的末尾移除小值。

def future6(A):
    # list of tuples (index into A, value in A)
    # invariant: indexes and values in sorted order
    known = []
    result = []
    for idx in xrange(len(A) - 1, -1, -1):
        value = A[idx]
        # since known is sorted a binary search could be applied here
        # I haven't bothered

        # anything lower then the current value
        # cannot possibly be used again, since this value will be first index instead
        # of those
        while known and known[-1][1] < value:
            known.pop()


        if known: 
            # all values in known are > current value
            # they reverse sorted by index               
            # the value with the lowest index is first
            result.append(known[-1][0])
        else:
            # no values exist this high, report -1
            result.append(-1)
        # add to end of the list to maintain invariant
        known.append( (idx, value) )

    # let numpy worry about reversing the array
    return np.array(result)[::-1]

但我想到我们可以重用之前计算的B值,而不是构建新的数据结构。如果j > i,且A[i] > A[j],那么B[i] > B[j]。

def future8(A):
    B = [-1] * len(A)
    for index in xrange(len(A)-2, -1, -1):
        target = index + 1
        value = A[index]
        while target != -1 and A[target] < value:
            target = B[target]
        B[index] = target
    return np.array(B)

我的基准测试结果:

Random series:
future2 ascends  : 0.242569923401
future6 full list: 0.0363488197327
future7 vectorize: 0.129994153976
future8 reuse: 0.0299410820007
Oscillating series:
future2 ascends  : 0.233623981476
future6 full list: 0.0360488891602
future7 vectorize: 1.19140791893
future8 reuse: 0.0297570228577
Ascending trend series:
future2 ascends  : 0.120707035065
future6 full list: 0.0314049720764
future7 vectorize: 0.0640320777893
future8 reuse: 0.0246520042419

升序段落

cyborg有一个很有趣的想法,就是利用升序段落。我觉得他的测试案例并没有真正展现出他想要的效果。我认为这些段落的长度不够,无法充分利用。但我想真实数据中可能会有这样的段落,所以利用它会非常有帮助。

不过我觉得这可能行不通。准备进行二分搜索所需的数据需要O(n)的时间。如果我们多次进行二分搜索,这样是可以的,但一旦我们在升序段落的中间找到一个值,就不会再回到左边的任何部分。因此,即使使用二分搜索,我们处理数据的时间最多也要O(n)。

如果构建所需数据的成本低于后续扫描升序段落的成本,那可能会有效。但扫描的成本相对较低,要想找到一种处理升序段落的方式,成本更低是很难的。

撰写回答