如何使用PySp执行嵌套foreach循环

2024-06-01 03:53:44 发布

您现在位置:Python中文网/ 问答频道 /正文

假设一个大型数据集(>;40GB拼花文件)包含数千个变量的值观测值,其大小为三倍(变量、时间戳、值)

现在考虑一个查询,您只对500个变量的子集感兴趣。您需要检索特定时间点(观察窗口或时间框架)的这些变量的观察值(值-->;时间序列)。这样有开始和结束的时间。

如果没有分布式计算(Spark),您可以这样编写:

for var_ in variables_of_interest:
    for incident in incidents:

        var_df = df_all.filter(
            (df.Variable == var_)
            & (df.Time > incident.startTime)
            & (df.Time < incident.endTime))

我的问题是:如何使用Spark/PySpark?我在想:

  1. 以某种方式将事件与变量连接起来,然后过滤数据帧。
  2. 广播事件数据帧,并在过滤变量观测值(df_all)时在map函数中使用它。
  3. 以某种方式使用RDD.cartasian或RDD.mapParitions(备注:拼花文件是按变量分区保存的)。

预期产出应为:

incident1 --> dataframe 1
incident2 --> dataframe 2
...

其中,数据帧1包含事件1和数据帧2的时间范围内的所有变量及其观测值,这些值在事件2的时间范围内。

我希望你有这个想法。

更新

我试图根据idea#1和zero323给出的答案编写解决方案的代码。工作很好,但我想知道如何在最后一步将其聚合/组合到事件中?我试着给每个事件添加一个序列号,但在最后一步中出错了。如果你能检查和/或完成代码,那就太酷了。因此我上传了样本数据和脚本。环境是Spark 1.4(PySpark):


Tags: 文件数据gt脚本loopdfforvar