如何在PySpark结构化流媒体中替换每组缺失的值?

2024-06-09 09:33:01 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用PySpark结构化流媒体,我想用我在过去X秒内看到的组中所有值的最大值替换列中缺少的值。我怎样才能做到这一点(在Pypark)?你知道吗

示例:

输入

ts  key value
0   a   2
0   b   5
1   a   1
1   b   7
2   a   NULL
2   b   6
3   a   3
3   b   NULL

输出

ts  key value
0   a   2
0   b   5
1   a   1
1   b   7
2   a   2      = max(2,1)
2   b   6
3   a   3
3   b   7      = max(5,7,6)

我认为一个好的解决方案是通过mapGroupsWithState使用有状态转换。不幸的是,这个特性似乎还没有出现在PySpark中。你知道吗

我已经尝试过使用窗口聚合,但是当前的解决方案不起作用(见下文)。我需要这样的东西:

streamingDf \
  .groupBy(window("ts", "10 seconds", "1 second"), col("key")) \
  .agg(when(col("value").eqNullSafe(None)), max("value")) \
  .otherwise(col("value")) \
  .alias("value"))

Tags: key示例value状态col特性解决方案null