Pyspark中的宽数据帧操作

2024-06-16 11:29:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我是一个Spark新手,正在尝试使用pyspark(spark2.2)对非常广泛的特性集(大约1300万行,15000列)执行过滤和聚合操作。功能集作为拼花文件存储在S3驱动器上。我正在运行一个测试脚本来加载数据帧中的特性集,选择几千条记录,按特定的区域代码分组,并平均每个15k个特性列。问题是工作要么出错,要么耗时太长(对于5%的记录样本,大约8小时)。在

在Pyspark中,有没有什么方法可以加速对宽数据帧的这些操作?我使用的是Jupyter笔记本,希望这些查询在几分钟内完成,而不是几个小时。在

这是我的密码

df_feature_store = spark.read.parquet(PATH_FEATURE_STORE).sample(False, 0.05, seed=0).cache()
    logger.info("Initial data set loaded and sampled")

    df_selected_rors = spark.read.csv(PATH_DATA_SOURCE+"ROR Sample.csv", header=True)
    agg_cols = [x for x in df_feature_store.columns if re.search("^G\d{2}",x)]
    agg_cols = agg_cols[:10]  # just testing with fewer columns
    expr = {x:"mean" for x in agg_cols}
    joineddf = df_feature_store.join(df_selected_rors, df_feature_store.ROLLOUTREGION_IDENTIFIER == df_selected_rors.ROR, "inner")
    aggdf = joineddf.groupby("ROLLOUT_REGION_IDENTIFIER").agg(expr)
    # replace groupby
    # loop for a 1000 column aggregations 
    # transpose columns into rows as arrays
    aggdf.write.mode("overwrite").csv(PATH_FEATURE_STORE + "aggregated", header=True)
    logger.info("Done")`

Tags: columnscsv数据pathstoredffor记录
1条回答
网友
1楼 · 发布于 2024-06-16 11:29:02

我试着把它分开看看问题出在哪里

  • Spark的某些版本在DFs中有很多很多列的问题;我不记得具体的细节了。在
  • 从CSV读取并保存在Parquet本地,在任何查询之前,过滤列(如果可以的话)
  • 运行查询Parquet local-到Parquet local

作为工作目标的S3(a)提交速度慢,(b)由于S3的最终一致性而有丢失数据的风险。除非您使用的是S3mper/S3Guard/EMR一致的emrf,否则您不应该将其用作工作的直接目的地。在

相关问题 更多 >