RDD的切片和分区有什么区别?
我正在使用Spark的Python接口,并且运行的是Spark 0.8版本。
我存储了一个很大的浮点向量的RDD,现在需要对一个向量和整个集合进行计算。
在RDD中,切片和分区有什么区别吗?
当我创建这个RDD时,我传入了100这个参数,这样就把RDD分成了100个切片,并在计算时创建了100个任务。我想知道,数据的分区是否能比切片更提高性能,让系统更有效率地处理数据(也就是说,分区操作和在切片的RDD上对每个元素进行操作之间有什么不同)。
举个例子,这两段代码之间有没有什么显著的区别呢?
rdd = sc.textFile(demo.txt, 100)
与
rdd = sc.textFile(demo.txt)
rdd.partitionBy(100)
2 个回答
你可以这样进行分区:
import org.apache.spark.Partitioner
val p = new Partitioner() {
def numPartitions = 2
def getPartition(key: Any) = key.asInstanceOf[Int]
}
recordRDD.partitionBy(p)
我认为在Apache Spark中,slices
和partitions
是一样的东西。
不过,你发的那两段代码之间有一个微妙但可能很重要的区别。
第一段代码会尝试直接把demo.txt
加载到100个分区中,使用100个并行任务:
rdd = sc.textFile('demo.txt', 100)
对于未压缩的文本,这样做是没问题的。但是如果你用的是demo.gz
,那么最后你得到的RDD只有1个分区。因为读取压缩文件是无法并行处理的。
而第二段代码会先把demo.txt
打开成一个默认分区数量的RDD,然后再把数据明确地重新分成大约相等的100个分区。
rdd = sc.textFile('demo.txt')
rdd = rdd.repartition(100)
所以在这种情况下,即使是demo.gz
,你也会得到一个有100个分区的RDD。
顺便提一下,我把你的partitionBy()
换成了repartition()
,因为我觉得你想要的是这个。partitionBy()
要求RDD是一个元组的RDD。而且由于repartition()
在Spark 0.8.0中不可用,你可以使用coalesce(100, shuffle=True)
来代替。
Spark可以为每个RDD的分区运行1个并行任务,最多到你集群中的核心数。所以如果你的集群有50个核心,你希望你的RDD至少有50个分区(可能还要多2-3倍,具体可以参考这里)。
从Spark 1.1.0开始,你可以这样检查一个RDD有多少个分区:
rdd.getNumPartitions() # Python API
rdd.partitions.size // Scala API
在1.1.0之前,使用Python API的方法是rdd._jrdd.splits().size()
。