RDD的切片和分区有什么区别?

12 投票
2 回答
5657 浏览
提问于 2025-04-18 10:06

我正在使用Spark的Python接口,并且运行的是Spark 0.8版本。

我存储了一个很大的浮点向量的RDD,现在需要对一个向量和整个集合进行计算。

在RDD中,切片和分区有什么区别吗?

当我创建这个RDD时,我传入了100这个参数,这样就把RDD分成了100个切片,并在计算时创建了100个任务。我想知道,数据的分区是否能比切片更提高性能,让系统更有效率地处理数据(也就是说,分区操作和在切片的RDD上对每个元素进行操作之间有什么不同)。

举个例子,这两段代码之间有没有什么显著的区别呢?

rdd = sc.textFile(demo.txt, 100)

rdd = sc.textFile(demo.txt)
rdd.partitionBy(100)

2 个回答

2

你可以这样进行分区:

import org.apache.spark.Partitioner

val p = new Partitioner() {
  def numPartitions = 2
  def getPartition(key: Any) = key.asInstanceOf[Int]
}
recordRDD.partitionBy(p)
24

我认为在Apache Spark中,slicespartitions是一样的东西。

不过,你发的那两段代码之间有一个微妙但可能很重要的区别。

第一段代码会尝试直接把demo.txt加载到100个分区中,使用100个并行任务:

rdd = sc.textFile('demo.txt', 100)

对于未压缩的文本,这样做是没问题的。但是如果你用的是demo.gz,那么最后你得到的RDD只有1个分区。因为读取压缩文件是无法并行处理的。

而第二段代码会先把demo.txt打开成一个默认分区数量的RDD,然后再把数据明确地重新分成大约相等的100个分区。

rdd = sc.textFile('demo.txt')
rdd = rdd.repartition(100)

所以在这种情况下,即使是demo.gz,你也会得到一个有100个分区的RDD。

顺便提一下,我把你的partitionBy()换成了repartition(),因为我觉得你想要的是这个。partitionBy()要求RDD是一个元组的RDD。而且由于repartition()在Spark 0.8.0中不可用,你可以使用coalesce(100, shuffle=True)来代替。

Spark可以为每个RDD的分区运行1个并行任务,最多到你集群中的核心数。所以如果你的集群有50个核心,你希望你的RDD至少有50个分区(可能还要多2-3倍,具体可以参考这里)。

从Spark 1.1.0开始,你可以这样检查一个RDD有多少个分区:

rdd.getNumPartitions()  # Python API
rdd.partitions.size     // Scala API

在1.1.0之前,使用Python API的方法是rdd._jrdd.splits().size()

撰写回答