如何有效计算运行中值

2024-05-12 16:32:56 发布

您现在位置:Python中文网/ 问答频道 /正文

我借了一些代码来实现一个函数来计算大量数据的运行中值。当前的一个对我来说太慢了(棘手的部分是,我需要从运行框中排除所有零)。下面是代码:

from itertools import islice
from collections import deque
from bisect import bisect_left,insort

def median(s):
    sp = [nz for nz in s if nz!=0]
    print sp
    Mnow = len(sp)
    if Mnow == 0:
        return 0
    else:
        return np.median(sp)

def RunningMedian(seq, M):
    seq = iter(seq)
    s = []

    # Set up list s (to be sorted) and load deque with first window of seq
    s = [item for item in islice(seq,M)]
    d = deque(s)

    # Sort it in increasing order and extract the median ("center" of the sorted window)
    s.sort()
    medians = [median(s)]
    for item in seq:
        old = d.popleft()          # pop oldest from left
        d.append(item)             # push newest in from right
        del s[bisect_left(s, old)] # locate insertion point and then remove old 
        insort(s, item)            # insert newest such that new sort is not required        
        medians.append(median(s))
    return medians

它工作得很好,唯一的缺点是速度太慢。有谁能帮我提高代码的效率?

在我探索了所有可能的方法之后,下面的简单代码可以比较有效地进行计算:

def RunningMedian(x,N):
    idx = np.arange(N) + np.arange(len(x)-N+1)[:,None]
    b = [row[row>0] for row in x[idx]]
    return np.array(map(np.median,b))
    #return np.array([np.median(c) for c in b])  # This also works

Tags: 代码infromimportforreturndefnp
2条回答

一种方法如下:

def RunningMedian(x,N):
    idx = np.arange(N) + np.arange(len(x)-N+1)[:,None]
    b = [row[row>0] for row in x[idx]]
    return np.array(map(np.median,b))
    #return np.array([np.median(c) for c in b])  # This also works

我发现了一个快得多的,复制如下:

from collections import deque
from bisect import insort, bisect_left
from itertools import islice
def running_median_insort(seq, window_size):
    """Contributed by Peter Otten"""
    seq = iter(seq)
    d = deque()
    s = []
    result = []
    for item in islice(seq, window_size):
        d.append(item)
        insort(s, item)
        result.append(s[len(d)//2])
    m = window_size // 2
    for item in seq:
        old = d.popleft()
        d.append(item)
        del s[bisect_left(s, old)]
        insort(s, item)
        result.append(s[m])
    return result

看看链接:running_median

在处理数据样本时,可以使用两个heaps来维护数据样本的下半部分和上半部分。算法是这样的:对于每个值,将其放在一个适当的堆中,并“重新平衡”堆,以便它们的大小相差不超过1。然后,为了得到中间值,只需从更大的堆中取出第一个元素,或者取两个堆中第一个元素的平均值(如果它们大小相等)。此解决方案具有O(n log(n))时间复杂性。

from heapq import heappush, heappop


class RunningMedian:
    def __init__(self):
        self.lowers, self.highers = [], []

    def add_number(self, number):
        if not self.highers or number > self.highers[0]:
            heappush(self.highers, number)
        else:
            heappush(self.lowers, -number)  # for lowers we need a max heap
        self.rebalance()

    def rebalance(self):
        if len(self.lowers) - len(self.highers) > 1:
            heappush(self.highers, -heappop(self.lowers))
        elif len(self.highers) - len(self.lowers) > 1:
            heappush(self.lowers, -heappop(self.highers))

    def get_median(self):
        if len(self.lowers) == len(self.highers):
            return (-self.lowers[0] + self.highers[0])/2
        elif len(self.lowers) > len(self.highers):
            return -self.lowers[0]
        else:
            return self.highers[0]

演示:

>>> running_median = RunningMedian()
>>> for n in (12, 4, 5, 3, 8, 7):
...     running_median.add_number(n)
...     print(running_median.get_median())
... 
12
8.0
5
4.5
5
6.0

相关问题 更多 >