我的原始数据是表格格式的。它包含来自不同变量的观察结果。每个观察值都有变量名、时间戳和当时的值。
Variable [string], Time [datetime], Value [float]
数据以拼花的形式存储在HDFS中,并加载到Spark数据帧(df)中。从那个数据框。
现在我要计算每个变量的默认统计数据,如平均值、标准差和其他。之后,一旦检索到平均值,我想过滤/计算那些接近平均值的变量值。
因此我需要先得到每个变量的平均值。这就是为什么我使用GroupBy来获取每个变量(而不是整个数据集)的统计信息。
df_stats = df.groupBy(df.Variable).agg( \
count(df.Variable).alias("count"), \
mean(df.Value).alias("mean"), \
stddev(df.Value).alias("std_deviation"))
用每个变量的平均值,我就可以过滤那些值(仅仅是计数),这些值是在平均值附近的特定变量。因此我需要这个变量的所有观测值。这些值在原始数据框中,而不是在聚合/分组数据框中。
最后,我想要一个数据帧,如聚合/分组的df_stats和一个新列count_arou mean“。
我在考虑使用df-stats.map(…)或df-stats.join(df,df.Variable)。但我被红色箭头困住了
问题:你如何意识到这一点?
临时解决方案:同时,我正在使用基于您想法的解决方案。但是stddev range 2和3的range函数不起作用。它总是产生一个
AttributeError saying NullType has no _jvm
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
w1 = Window().partitionBy("Variable")
w2 = Window.partitionBy("Variable").orderBy("Time")
def stddev_pop_w(col, w):
#Built-in stddev doesn't support windowing
return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2))
def isInRange(value, mean, stddev, radius):
try:
if (abs(value - mean) < radius * stddev):
return 1
else:
return 0
except AttributeError:
return -1
delta = col("Time").cast("long") - lag("Time", 1).over(w2).cast("long")
#f = udf(lambda (value, mean, stddev, radius): abs(value - mean) < radius * stddev, IntegerType())
f2 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 2), IntegerType())
f3 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 3), IntegerType())
df \
.withColumn("mean", mean("Value").over(w1)) \
.withColumn("std_deviation", stddev_pop_w(col("Value"), w1)) \
.withColumn("delta", delta)
.withColumn("stddev_2", f2("Value", "mean", "std_deviation")) \
.withColumn("stddev_3", f3("Value", "mean", "std_deviation")) \
.show(5, False)
#df2.withColumn("std_dev_3", stddev_range(col("Value"), w1)) \
火花2.0+:
您可以用一个内置的
pyspark.sql.functions.stddev*
函数替换stddev_pop_w
。火花<;2.0:
一般来说,不需要使用join进行聚合。相反,您可以使用窗口函数计算统计信息而不折叠行。假设您的数据如下所示:
您可以通过
variable
定义具有分区的窗口:统计如下:
仅用于比较聚合与联接:
相关问题 更多 >
编程相关推荐