如何在Python 3中计算移动平均?
假设我有一个列表:
y = ['1', '2', '3', '4','5','6','7','8','9','10']
我想创建一个函数来计算移动的n天平均值。比如说,如果 n
是5,我希望我的代码能计算出第1到第5天的平均值,先把这几天的数加起来,然后算出平均值,这样得到的结果是3.0。接着再计算第2到第6天的平均值,结果是4.0,然后是第3到第7天,依此类推,直到第6到第10天。
我不想计算前面n-1天的平均值,所以从第n天开始,才会计算之前的几天。
def moving_average(x:'list of prices', n):
for num in range(len(x)+1):
print(x[num-n:num])
这段代码似乎能输出我想要的结果:
[]
[]
[]
[]
[]
['1', '2', '3', '4', '5']
['2', '3', '4', '5', '6']
['3', '4', '5', '6', '7']
['4', '5', '6', '7', '8']
['5', '6', '7', '8', '9']
['6', '7', '8', '9', '10']
不过,我不知道怎么计算那些列表里的数字。有什么好主意吗?
5 个回答
2
一种避免重新计算中间和的方法。
list=range(0,12)
def runs(v):
global runningsum
runningsum+=v
return(runningsum)
runningsum=0
runsumlist=[ runs(v) for v in list ]
result = [ (runsumlist[k] - runsumlist[k-5])/5 for k in range(0,len(list)+1)]
打印结果
[2,3,4,5,6,7,8,9]
让我们运行(int(v)) .. 然后 .. repr( runsumlist[k] - runsumlist[k-5])/5 ) 如果你想把数字当作字符串来处理的话。
没有全局变量的替代方案:
list = [float[x] for x in range(0,12)]
nave = 5
movingave = sum(list[:nave]/nave)
for i in range(len(list)-nave):movingave.append(movingave[-1]+(list[i+nave]-list[i])/nave)
print movingave
确保即使输入的值是整数,也要进行浮点数运算。
[2.0,3.0,4.0,5.0,6.0,7.0,8.0,9,0]
7
我很喜欢Martijn的回答,不过像乔治一样,我在想,使用一个运行中的累加和会不会比一次又一次地对差不多相同的数字使用sum()
要快一些。
另外,在逐步增加的阶段使用None
作为默认值的想法也很有趣。实际上,关于移动平均数可能会有很多不同的场景。我们可以把平均数的计算分成三个阶段:
- 逐步增加:开始的迭代次数小于窗口大小
- 稳定进展:我们有正好窗口大小数量的元素来计算正常的
average := sum(x[iteration_counter-window_size:iteration_counter])/window_size
- 逐步减少:在输入数据的末尾,我们可以返回另外
window_size - 1
个“平均”数字。
这里有一个函数,它接受:
- 任意可迭代对象(生成器也可以)作为数据输入
- 任意大于等于1的窗口大小
- 在逐步增加/减少阶段开关值生成的参数
- 用于这些阶段的回调函数,以控制值的生成方式。这可以用来不断提供一个默认值(例如
None
)或提供部分平均值
下面是代码:
from collections import deque
def moving_averages(data, size, rampUp=True, rampDown=True):
"""Slide a window of <size> elements over <data> to calc an average
First and last <size-1> iterations when window is not yet completely
filled with data, or the window empties due to exhausted <data>, the
average is computed with just the available data (but still divided
by <size>).
Set rampUp/rampDown to False in order to not provide any values during
those start and end <size-1> iterations.
Set rampUp/rampDown to functions to provide arbitrary partial average
numbers during those phases. The callback will get the currently
available input data in a deque. Do not modify that data.
"""
d = deque()
running_sum = 0.0
data = iter(data)
# rampUp
for count in range(1, size):
try:
val = next(data)
except StopIteration:
break
running_sum += val
d.append(val)
#print("up: running sum:" + str(running_sum) + " count: " + str(count) + " deque: " + str(d))
if rampUp:
if callable(rampUp):
yield rampUp(d)
else:
yield running_sum / size
# steady
exhausted_early = True
for val in data:
exhausted_early = False
running_sum += val
#print("st: running sum:" + str(running_sum) + " deque: " + str(d))
yield running_sum / size
d.append(val)
running_sum -= d.popleft()
# rampDown
if rampDown:
if exhausted_early:
running_sum -= d.popleft()
for (count) in range(min(len(d), size-1), 0, -1):
#print("dn: running sum:" + str(running_sum) + " deque: " + str(d))
if callable(rampDown):
yield rampDown(d)
else:
yield running_sum / size
running_sum -= d.popleft()
这个版本似乎比Martijn的版本快一点,虽然后者更优雅。这里是测试代码:
print("")
print("Timeit")
print("-" * 80)
from itertools import islice
def window(seq, n=2):
"Returns a sliding window (of width n) over data from the iterable"
" s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... "
it = iter(seq)
result = tuple(islice(it, n))
if len(result) == n:
yield result
for elem in it:
result = result[1:] + (elem,)
yield result
# Martijn's version:
def moving_averages_SO(values, size):
for selection in window(values, size):
yield sum(selection) / size
import timeit
problems = [int(i) for i in (10, 100, 1000, 10000, 1e5, 1e6, 1e7)]
for problem_size in problems:
print("{:12s}".format(str(problem_size)), end="")
so = timeit.repeat("list(moving_averages_SO(range("+str(problem_size)+"), 5))", number=1*max(problems)//problem_size,
setup="from __main__ import moving_averages_SO")
print("{:12.3f} ".format(min(so)), end="")
my = timeit.repeat("list(moving_averages(range("+str(problem_size)+"), 5, False, False))", number=1*max(problems)//problem_size,
setup="from __main__ import moving_averages")
print("{:12.3f} ".format(min(my)), end="")
print("")
输出结果是:
Timeit
--------------------------------------------------------------------------------
10 7.242 7.656
100 5.816 5.500
1000 5.787 5.244
10000 5.782 5.180
100000 5.746 5.137
1000000 5.745 5.198
10000000 5.764 5.186
现在可以通过这个函数调用来解决原始问题:
print(list(moving_averages(range(1,11), 5,
rampUp=lambda _: None,
rampDown=False)))
输出结果:
[None, None, None, None, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
23
在旧版的Python文档中,有一个很棒的滑动窗口生成器,里面有一些itertools
的例子:
from itertools import islice
def window(seq, n=2):
"Returns a sliding window (of width n) over data from the iterable"
" s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... "
it = iter(seq)
result = tuple(islice(it, n))
if len(result) == n:
yield result
for elem in it:
result = result[1:] + (elem,)
yield result
使用这个方法,计算移动平均值就变得非常简单:
from __future__ import division # For Python 2
def moving_averages(values, size):
for selection in window(values, size):
yield sum(selection) / size
把这个应用到你的输入数据上(把字符串转换成整数)会得到:
>>> y= ['1', '2', '3', '4','5','6','7','8','9','10']
>>> for avg in moving_averages(map(int, y), 5):
... print(avg)
...
3.0
4.0
5.0
6.0
7.0
8.0
为了在前面n - 1
次迭代中返回None
,以处理“不完整”的数据集,只需要稍微扩展一下moving_averages
函数:
def moving_averages(values, size):
for _ in range(size - 1):
yield None
for selection in window(values, size):
yield sum(selection) / size