Pypark流媒体应用程序计算移动平均值

2024-04-29 16:05:57 发布

您现在位置:Python中文网/ 问答频道 /正文

我的数据流包含一个时间戳和一个对应的值

3-10-14 1:06:05,9.74
3-10-14 1:08:02,10.44
3-10-14 1:09:20,9.83
3-10-14 1:11:53,10.49

我需要取10个元素的移动平均值。我一直在用函数探索foreachRDD选项,但是在浏览了spark文档之后没有成功,但是没有找到任何明确的指示。在


Tags: 函数文档元素选项时间spark数据流平均值
1条回答
网友
1楼 · 发布于 2024-04-29 16:05:57

假设您的数据在元组D的列表(time,value)。使用numpy命令mean(),10个元素(独立于时间戳)的移动平均值如下:

import numpy as np

def mov_avg(D):

    avg_list = []
    Dval = [item[1] for item in D] #extracts the values from D, ignoring time stamp
    L = len(D)

    for i in range(L):
        if i<10:
            l = Dval[:i+1]
            M = np.mean(l)
            avg_list.append(M)

        else:
            l = Dval[i-9:i+1]
            M = np.mean(l)
            avg_list.append(M)

    return avg_list

此函数将返回一个列表avg_list,这样avg_list[j]是数据D之前的10个值的平均值,包括第j+1个值。因此,如果我们设置A = mov_avg(D),那么A[12]将是{}值3到12的平均值(如果我们包括3和12,这是10个元素),而{}将是{}值0到6的平均值。在

可能有一个预先存在的命令,但这是您将如何编码它否则。在

相关问题 更多 >