我的原始数据是表格格式的。它包含来自不同变量的观察结果。每个观察值都有变量名、时间戳和当时的值。
Variable [string], Time [datetime], Value [float]
数据以拼花的形式存储在HDFS中,并加载到Spark数据帧(df)中。从那个数据框。
现在我要计算每个变量的默认统计数据,如平均值、标准差和其他。之后,一旦检索到平均值,我想过滤/计算那些接近平均值的变量值。
由于对我的other question的回答,我想出了以下代码:
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
w1 = Window().partitionBy("Variable")
w2 = Window.partitionBy("Variable").orderBy("Time")
def stddev_pop_w(col, w):
#Built-in stddev doesn't support windowing
return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2))
def isInRange(value, mean, stddev, radius):
try:
if (abs(value - mean) < radius * stddev):
return 1
else:
return 0
except AttributeError:
return -1
delta = col("Time").cast("long") - lag("Time", 1).over(w2).cast("long")
#f = udf(lambda (value, mean, stddev, radius): abs(value - mean) < radius * stddev, IntegerType())
#f2 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 2), IntegerType())
#f3 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 3), IntegerType())
df_ = df_all \
.withColumn("mean", mean("Value").over(w1)) \
.withColumn("std_deviation", stddev_pop_w(col("Value"), w1)) \
.withColumn("delta", delta) \
# .withColumn("stddev_2", f2("Value", "mean", "std_deviation")) \
# .withColumn("stddev_3", f3("Value", "mean", "std_deviation")) \
#df2.show(5, False)
问题:最后两行注释无效。它将给出attributeRor,因为stddev和mean的传入值为空。我想这是因为我所指的列也是动态计算的,在那一刻没有值。但有没有办法做到这一点?
目前我正在做第二次这样的跑步:
df = df_.select("*", \
abs(df_.Value - df_.mean).alias("max_deviation_mean"), \
when(abs(df_.Value - df_.mean) < 2 * df_.std_deviation, 1).otherwise(1).alias("std_dev_mean_2"), \
when(abs(df_.Value - df_.mean) < 3 * df_.std_deviation, 1).otherwise(1).alias("std_dev_mean_3"))
这无法工作,因为当您执行
您使用
pyspark.sql.functions.abs
对内置的abs
进行阴影处理,它需要一个列而不是本地Python值作为输入。另外,您创建的UDF不处理
NULL
条目。不要使用
import *
,除非你知道什么是进口的。取而代之的是别名或导入模块
一定要检查UDF中的输入,除非有必要,否则最好避免使用UDF。
解决方案是使用DataFrame.aggregateByKey函数,该函数在将每个分区和节点的值合并为一个结果值的计算节点周围的聚合进行无序处理之前聚合每个分区和节点的值。
伪代码如下所示。它的灵感来自this tutorial,但是它使用了StatCounter的两个实例,尽管我们同时总结了两个不同的统计数据:
相关问题 更多 >
编程相关推荐