我在pyspark2.3中工作,试图找出从数据帧中获取一些聚合统计数据的最有效方法。在
我有一个包含15亿条记录的数据帧,分布在一个由10个节点组成的相对较小的集群上。每个都有16gb的ram和4个核心。我的复制因子设置为2。在
我的dataframe可能有15列,这是数据类型的混合,但我只对两列感兴趣——ID和eventDate。我想运行的代码非常简单:
output = df.groupby(['ID']).agg(F.min('eventDate').alias("firstDate"),F.max('eventDate').alias("lastDate"))
output.write.parquet('hdfs:///somewhere/dateFile.parquet',mode='overwrite')
我想知道的是做这个手术最有效的方法。ID是我分组依据的字段,有12m值,并且df.rdd.getNumPartitions()当前为642。在
我最好先把我的数据帧投射到我想要的两列上吗?有这么多ID,我应该先重新划分数据集吗?我应该删除副本吗?我可以在groupby之前运行这样的程序:
^{pr2}$或者
df = df[['ID','eventDate']].repartition(x)
我正在努力弄清楚什么可以优化运行时。任何关于预先确定运行时的指导都将不胜感激。如果可能的话,我不希望只是“测试一下”,因为我要运行几个这样的查询,每个查询都需要一段时间。在
它可能不是您所要寻找的答案,但此操作的最佳代码正是
Spark只需首先选择整个操作所需的色谱柱,从而优化工艺。然后,Spark按
ID
对数据进行分区,并在每个分区上启动聚合过程。在允许最大数量的执行器肯定会有帮助。我建议(根据您的描述)设置
spark.executor.instances=10; spark.executor.memory=10g
。12m值是一个相当大的数量,也许可以尝试增加shuffle分区的数量,例如spark.sql.shuffle.partitions=400
,这样你就不会得到一些恼人的内存开销异常。在@飞肉丸子
在进行聚合之前,请执行以下步骤
1-丢弃不需要的数据(它会吃掉你的资源)。在
2-根据数据重新分区和缓存数据(这将消除执行时间)
提示:如果数据来自Cassandra,则按分区键重新分区数据,以避免数据混乱
现在您可以使用聚合逻辑;)
谢谢,
维姆莱什
相关问题 更多 >
编程相关推荐