为什么1行数据帧上的collect()使用2000个执行器?

2024-05-23 19:33:30 发布

您现在位置:Python中文网/ 问答频道 /正文

这是我能想到的最简单的数据帧。我使用的是PySpark 1.6.1。在

# one row of data
rows = [ (1,   2) ]
cols = [ "a", "b" ]
df   = sqlContext.createDataFrame(rows, cols)

所以这个数据框完全可以放在内存中,没有任何文件的引用,在我看来非常微不足道。在

但当我收集数据时,它使用了2000个执行器:

^{pr2}$

在收集期间,使用2000个执行器:

[Stage 2:===================================================>(1985 + 15) / 2000]

然后是预期输出:

[Row(a=1, b=2)]

为什么会这样?数据帧不应该完全存储在驱动程序的内存中吗?在


Tags: 文件of数据内存dfdataone执行器
2条回答

您可以配置执行器的数量。在许多情况下,spark将尽可能多地使用执行器,并且执行时间比限制为少数执行器时要糟糕得多。在

conf = SparkConf()
conf.set('spark.dynamicAllocation.enabled','true')
conf.set('spark.dynamicAllocation.maxExecutors','32')

所以我研究了一下代码试图弄清楚到底发生了什么。似乎sqlContext.createDataFrame确实没有尝试根据数据设置合理的参数值。在

为什么要执行2000个任务?

Spark使用2000个任务,因为我的数据框有2000个分区。(尽管分区比行多似乎是一派胡言。)

具体表现在:

>>> df.rdd.getNumPartitions()
2000

为什么数据帧有2000个分区?

发生这种情况是因为sqlContext.createDataFrame最终使用默认的分区数(在我的例子中是2000),而不管数据是如何组织的或它有多少行。在

代码跟踪如下。在

sql/context.py中,sqlContext.createDataFrame函数调用(在本例中):

^{pr2}$

反过来又叫:

return self._sc.parallelize(data), schema

并且sqlContext.parallelize函数在context.py中定义:

numSlices = int(numSlices) if numSlices is not None else self.defaultParallelism

不检查行数,也不可能指定sqlContext.createDataFrame中的片数。在

如何更改数据帧有多少个分区?

使用DataFrame.coalesce。在

>>> smdf = df.coalesce(1)
>>> smdf.rdd.getNumPartitions()
1
>>> smdf.explain()
== Physical Plan ==
Coalesce 1
+- Scan ExistingRDD[a#0L,b#1L]
>>> smdf.collect()
[Row(a=1, b=2)]

相关问题 更多 >