为什么我的Spark数据帧比RDD慢得多?

2024-03-29 08:14:40 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个非常简单的Spark DataFrame,当运行dataframegroupby时,性能非常糟糕-大约比(在我脑海中)等效的RDD reduceByKey慢8倍。。。在

我缓存的DF只有两列,customer和name只有50k行:

== Physical Plan ==
InMemoryColumnarTableScan [customer#2454,name#2456], InMemoryRelation [customer#2454,name#2456], true, 10000, StorageLevel(true, true, false, true, 1), Scan ParquetRelation[customer#2454,name#2456] InputPaths: hdfs://nameservice1/tmp/v2_selected_parquet/test_parquet2, None

当我运行以下两个代码片段时,我希望性能相似,而不是rdd版本在10秒内运行,DF版本在85秒内运行。。。在

^{pr2}$

我是不是错过了一些真正重要的东西?FWIW,RDD版本运行54个阶段,DF版本是227:/

编辑:我使用的是spark1.6.1和python3.4.2。 编辑2:另外,源拼花被划分为customer/day/name-目前有27个客户,1天,大约45个名字。在


Tags: name版本true编辑dataframedfcustomer性能
1条回答
网友
1楼 · 发布于 2024-03-29 08:14:40

这两个数字似乎都比较高,而且还不清楚如何创建DataFrame或如何度量时间,但一般来说,这样的差异可以通过与分区数相比记录数较少来解释。在

{get}的默认值是cd200}中的任务数。对于50K条记录,启动一个任务的开销将高于并行执行所能获得的加速。让我们用一个简单的例子来说明这一点。首先,让我们创建一个示例数据:

import string
import random

random.seed(323)

def random_string():
  n = random.randint(3, 6)
  return (''.join(random.choice(string.ascii_uppercase) for _ in range(n)), )

df = (sc
    .parallelize([random_string() for _ in range(50000)], 8).toDF(["name"])
    .cache())

并根据shuffle.partitions的数量测量时间:

^{pr2}$

虽然这些值与您声称的不可比,并且这些数据是在本地模式下收集的,但是您可以看到相对清晰的模式。这同样适用于RDD:

from operator import add

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1).collect()
## 10 loops, best of 3: 414 ms per loop

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 10).collect()
## 10 loops, best of 3: 439 ms per loop

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 100).collect()
## 10 loops, best of 3: 1.3 s per loop

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1000).collect()
## 10 loops, best of 3: 8.41 s per loop

在适当的分布式环境中,由于网络IO的成本,这将更高。在

为了进行比较,让我们检查在没有Spark的情况下本地执行此任务需要多长时间

from collections import Counter

data = df.rdd.flatMap(lambda x: x).collect()

%timeit -n 10 Counter(data)
## 10 loops, best of 3: 9.9 ms per loop

您还应该查看数据位置。根据您使用和配置的存储,这可能会给您的作业增加额外的延迟,即使这样的小输入也是如此。在

相关问题 更多 >