我正在使用PySpark结构化流媒体,我想用我在过去X秒内看到的组中所有值的最大值替换列中缺少的值。我怎样才能做到这一点(在Pypark)?你知道吗
示例:
输入
ts key value
0 a 2
0 b 5
1 a 1
1 b 7
2 a NULL
2 b 6
3 a 3
3 b NULL
输出
ts key value
0 a 2
0 b 5
1 a 1
1 b 7
2 a 2 = max(2,1)
2 b 6
3 a 3
3 b 7 = max(5,7,6)
我认为一个好的解决方案是通过mapGroupsWithState
使用有状态转换。不幸的是,这个特性似乎还没有出现在PySpark中。你知道吗
我已经尝试过使用窗口聚合,但是当前的解决方案不起作用(见下文)。我需要这样的东西:
streamingDf \
.groupBy(window("ts", "10 seconds", "1 second"), col("key")) \
.agg(when(col("value").eqNullSafe(None)), max("value")) \
.otherwise(col("value")) \
.alias("value"))
目前没有回答
相关问题 更多 >
编程相关推荐