Python中的统计累加器
统计累加器可以让我们进行增量计算。举个例子,如果我们想计算一串数字的平均值,而这些数字是在不同时间给出的,我们可以创建一个对象来记录当前的数字个数n
和它们的总和sum
。当我们想要得到平均值时,这个对象只需要返回sum/n
就可以了。
这样的累加器的好处在于,当我们收到一个新数字时,不需要重新计算所有数字的总和和个数。
类似的累加器也可以用来计算其他统计数据(比如可以参考boost库中的C++实现)。
那么,如何在Python中实现累加器呢?我想到的代码是:
class Accumulator(object):
"""
Used to accumulate the arithmetic mean of a stream of
numbers. This implementation does not allow to remove items
already accumulated, but it could easily be modified to do
so. also, other statistics could be accumulated.
"""
def __init__(self):
# upon initialization, the numnber of items currently
# accumulated (_n) and the total sum of the items acumulated
# (_sum) are set to zero because nothing has been accumulated
# yet.
self._n = 0
self._sum = 0.0
def add(self, item):
# the 'add' is used to add an item to this accumulator
try:
# try to convert the item to a float. If you are
# successful, add the float to the current sum and
# increase the number of accumulated items
self._sum += float(item)
self._n += 1
except ValueError:
# if you fail to convert the item to a float, simply
# ignore the exception (pass on it and do nothing)
pass
@property
def mean(self):
# the property 'mean' returns the current mean accumulated in
# the object
if self._n > 0:
# if you have more than zero items accumulated, then return
# their artithmetic average
return self._sum / self._n
else:
# if you have no items accumulated, return None (you could
# also raise an exception)
return None
# using the object:
# Create an instance of the object "Accumulator"
my_accumulator = Accumulator()
print my_accumulator.mean
# prints None because there are no items accumulated
# add one (a number)
my_accumulator.add(1)
print my_accumulator.mean
# prints 1.0
# add two (a string - it will be converted to a float)
my_accumulator.add('2')
print my_accumulator.mean
# prints 1.5
# add a 'NA' (will be ignored because it cannot be converted to float)
my_accumulator.add('NA')
print my_accumulator.mean
# prints 1.5 (notice that it ignored the 'NA')
在设计上会出现一些有趣的问题:
- 如何让累加器是线程安全的?
- 如何安全地移除某些项目?
- 如何设计架构,以便可以轻松地插入其他统计功能(比如一个统计工厂)?
2 个回答
如果我用Python来做这件事,我会有两点不同的做法:
- 把每个累加器的功能分开。
- 不以你那种方式使用@property。
关于第一点,我可能会想出一个用于执行累加的接口,可能像这样:
def add(self, num) # add a number
def compute(self) # compute the value of the accumulator
然后我会创建一个累加器注册表(AccumulatorRegistry),用来保存这些累加器,并允许用户调用操作并对所有累加器进行添加。代码可能看起来像这样:
class Accumulators(object):
_accumulator_library = {}
def __init__(self):
self.accumulator_library = {}
for key, value in Accumulators._accumulator_library.items():
self.accumulator_library[key] = value()
@staticmethod
def register(name, accumulator):
Accumulators._accumulator_library[name] = accumulator
def add(self, num):
for accumulator in self.accumulator_library.values():
accumulator.add(num)
def compute(self, name):
self.accumulator_library[name].compute()
@staticmethod
def register_decorator(name):
def _inner(cls):
Accumulators.register(name, cls)
return cls
@Accumulators.register_decorator("Mean")
class Mean(object):
def __init__(self):
self.total = 0
self.count = 0
def add(self, num):
self.count += 1
self.total += num
def compute(self):
return self.total / float(self.count)
我应该提一下你关于线程安全的问题。Python的全局解释器锁(GIL)可以保护你免受很多线程问题的困扰。不过,你可能还是需要做一些事情来保护自己:
- 如果这些对象只在一个线程中使用,可以使用threading.local。
- 如果不是,你可以用锁来包裹操作,使用with语法来自动处理锁的获取和释放。
为了创建一个通用的、线程安全的高级函数,你可以结合使用 Queue.Queue
类和其他一些工具,像下面这样:
from Queue import Empty
def Accumulator(f, q, storage):
"""Yields successive values of `f` over the accumulation of `q`.
`f` should take a single iterable as its parameter.
`q` is a Queue.Queue or derivative.
`storage` is a persistent sequence that provides an `append` method.
`collections.deque` may be particularly useful, but a `list` is quite acceptable.
>>> from Queue import Queue
>>> from collections import deque
>>> from threading import Thread
>>> def mean(it):
... vals = tuple(it)
... return sum(it) / len(it)
>>> value_queue = Queue()
>>> LastThreeAverage = Accumulator(mean, value_queue, deque((), 3))
>>> def add_to_queue(it, queue):
... for value in it:
... value_queue.put(value)
>>> putting_thread = Thread(target=add_to_queue,
... args=(range(0, 12, 2), value_queue))
>>> putting_thread.start()
>>> list(LastThreeAverage)
[0, 1, 2, 4, 6, 8]
"""
try:
while True:
storage.append(q.get(timeout=0.1))
q.task_done()
yield f(storage)
except Empty:
pass
这个生成器函数大部分责任都交给了其他部分来处理:
- 它依赖于
Queue.Queue
来以线程安全的方式提供源数据 - 可以将一个
collections.deque
对象作为storage
参数传入;这样可以方便地只使用最后的n
(在这个例子中是3)个值 - 函数本身(在这里是
mean
)作为参数传入。这在某些情况下可能会导致代码效率不高,但可以适用于各种情况。
需要注意的是,如果你的生产者线程处理每个值的时间超过0.1秒,累加器可能会超时。你可以通过传入更长的超时时间来解决这个问题,或者完全去掉超时参数。在后者的情况下,函数会在队列末尾无限期阻塞;这种用法在子线程(通常是 daemon
线程)中更有意义。当然,你也可以将传递给 q.get
的参数作为 Accumulator
的第四个参数进行参数化。
如果你想从生产者线程(这里是 putting_thread
)传达队列结束的信息,也就是没有更多值要来了,你可以传递并检查一个哨兵值,或者使用其他方法。更多信息可以在 这个讨论串中找到;我选择写了一个名为 CloseableQueue 的 Queue.Queue
子类,提供了一个 close
方法。
你还可以通过限制队列大小等方式自定义这个函数的行为;这只是一个用法示例。
编辑
如上所述,这种方法因为需要重新计算而损失了一些效率,而且我认为并没有真正回答你的问题。
生成器函数还可以通过它的 send
方法接收值。所以你可以写一个平均值生成器函数,如下:
def meangen():
"""Yields the accumulated mean of sent values.
>>> g = meangen()
>>> g.send(None) # Initialize the generator
>>> g.send(4)
4.0
>>> g.send(10)
7.0
>>> g.send(-2)
4.0
"""
sum = yield(None)
count = 1
while True:
sum += yield(sum / float(count))
count += 1
这里的 yield 表达式既将值(传给 send
的参数)引入函数,同时又将计算出的值作为 send
的返回值传出。
你可以将调用该函数返回的生成器传递给一个更易优化的累加器生成器函数,如下所示:
def EfficientAccumulator(g, q):
"""Similar to Accumulator but sends values to a generator `g`.
>>> from Queue import Queue
>>> from threading import Thread
>>> value_queue = Queue()
>>> g = meangen()
>>> g.send(None)
>>> mean_accumulator = EfficientAccumulator(g, value_queue)
>>> def add_to_queue(it, queue):
... for value in it:
... value_queue.put(value)
>>> putting_thread = Thread(target=add_to_queue,
... args=(range(0, 12, 2), value_queue))
>>> putting_thread.start()
>>> list(mean_accumulator)
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
"""
try:
while True:
yield(g.send(q.get(timeout=0.1)))
q.task_done()
except Empty:
pass