高效的循环缓冲区?

154 投票
15 回答
142338 浏览
提问于 2025-04-16 06:53

我想在Python中创建一个高效的循环缓冲区,目的是为了计算缓冲区内整数值的平均值。

这样使用列表来收集值是否高效呢?

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

还有什么方法会更高效呢?(为什么呢?)

15 个回答

16

从列表的头部取出元素会导致整个列表被复制一遍,这样做效率很低。

你应该使用一个固定大小的列表或数组,并用一个索引来移动,这样在添加或删除项目时就能更有效率。

34

虽然这里已经有很多很好的回答,但我没有找到对提到的选项进行直接时间比较的内容。所以,我在下面做了一个简单的比较。

为了测试的目的,这个类可以在基于 list 的缓冲区、基于 collections.deque 的缓冲区和基于 Numpy.roll 的缓冲区之间切换。

请注意,update 方法一次只添加一个值,以保持简单。

import numpy
import timeit
import collections


class CircularBuffer(object):
    buffer_methods = ('list', 'deque', 'roll')

    def __init__(self, buffer_size, buffer_method):
        self.content = None
        self.size = buffer_size
        self.method = buffer_method

    def update(self, scalar):
        if self.method == self.buffer_methods[0]:
            # Use list
            try:
                self.content.append(scalar)
                self.content.pop(0)
            except AttributeError:
                self.content = [0.] * self.size
        elif self.method == self.buffer_methods[1]:
            # Use collections.deque
            try:
                self.content.append(scalar)
            except AttributeError:
                self.content = collections.deque([0.] * self.size,
                                                 maxlen=self.size)
        elif self.method == self.buffer_methods[2]:
            # Use Numpy.roll
            try:
                self.content = numpy.roll(self.content, -1)
                self.content[-1] = scalar
            except IndexError:
                self.content = numpy.zeros(self.size, dtype=float)

# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
                                   buffer_method=method)
                    for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
    # We add a convenient number of convenient values (see equality test below)
    code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
        i, circular_buffer_size)
    # Testing
    eval(code)
    buffer_content = [item for item in cb.content]
    assert buffer_content == range(circular_buffer_size)
    # Timing
    timeit_results.append(
        timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
    print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
        cb.method, timeit_results[-1],
        timeit_results[-1] / timeit_iterations * 1e3)

在我的系统上,这样的结果是:

list:  total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll:  total 6.27s (0.63ms per iteration)
291

我会使用collections.deque,并设置一个maxlen参数。

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
... 
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

文档中有一个示例,跟你想要的功能很相似。我认为它是最有效率的,主要是因为它是由一群非常厉害的程序员用C语言实现的,他们总是能写出高质量的代码。

撰写回答