Spark高效groupby操作重新划分?

2024-04-23 23:00:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我在pyspark2.3中工作,试图找出从数据帧中获取一些聚合统计数据的最有效方法。在

我有一个包含15亿条记录的数据帧,分布在一个由10个节点组成的相对较小的集群上。每个都有16gb的ram和4个核心。我的复制因子设置为2。在

我的dataframe可能有15列,这是数据类型的混合,但我只对两列感兴趣——ID和eventDate。我想运行的代码非常简单:

output = df.groupby(['ID']).agg(F.min('eventDate').alias("firstDate"),F.max('eventDate').alias("lastDate"))
output.write.parquet('hdfs:///somewhere/dateFile.parquet',mode='overwrite')

我想知道的是做这个手术最有效的方法。ID是我分组依据的字段,有12m值,并且df.rdd.getNumPartitions()当前为642。在

我最好先把我的数据帧投射到我想要的两列上吗?有这么多ID,我应该先重新划分数据集吗?我应该删除副本吗?我可以在groupby之前运行这样的程序:

^{pr2}$

或者

df = df[['ID','eventDate']].repartition(x)

我正在努力弄清楚什么可以优化运行时。任何关于预先确定运行时的指导都将不胜感激。如果可能的话,我不希望只是“测试一下”,因为我要运行几个这样的查询,每个查询都需要一段时间。在


Tags: 数据方法iddf核心output节点记录
2条回答

它可能不是您所要寻找的答案,但此操作的最佳代码正是

output = df.groupby(['ID']). \
 agg(F.min('eventDate').alias("firstDate"), F.max('eventDate').alias("lastDate"))
output.write.parquet('hdfs:///somewhere/dateFile.parquet', mode='overwrite')

Spark只需首先选择整个操作所需的色谱柱,从而优化工艺。然后,Spark按ID对数据进行分区,并在每个分区上启动聚合过程。在

允许最大数量的执行器肯定会有帮助。我建议(根据您的描述)设置spark.executor.instances=10; spark.executor.memory=10g。12m值是一个相当大的数量,也许可以尝试增加shuffle分区的数量,例如spark.sql.shuffle.partitions=400,这样你就不会得到一些恼人的内存开销异常。在

@飞肉丸子

在进行聚合之前,请执行以下步骤

1-丢弃不需要的数据(它会吃掉你的资源)。在

2-根据数据重新分区和缓存数据(这将消除执行时间)

提示:如果数据来自Cassandra,则按分区键重新分区数据,以避免数据混乱

现在您可以使用聚合逻辑;)

谢谢,
维姆莱什

相关问题 更多 >