Spark中的Kmeans
以下是用Apache Spark编写的Kmeans算法的一部分:
closest = data.map(lambda p: (closestPoint(p, kPoints), (p, 1)))
pointStats = closest.reduceByKey(lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2))
newPoints = pointStats.map(lambda (x, (y, z)): (x, y / z)).collect()
有没有人能给我解释一下它是怎么工作的?假设我们有两个聚类和1000个点,我们想在一个有两个从节点和一个主节点的集群中运行它。我认为第一个函数(closest)可以看作是映射器,第二个函数可以看作是合并器,但最后一个函数应该做什么呢?哪个是归约器?
2 个回答
如果你想理解Spark代码,首先要了解k-means算法。简单来说,k-means的步骤如下:
- 随机选择k个点作为你聚类的中心点
- 确定你的数据点属于哪个由这k个中心点定义的聚类
- 根据每个聚类中点的平均值选择新的中心点
- 重复步骤2和3,直到你觉得可以停止
你提到的第一行代码是算法的步骤(2);而第二行和第三行代码是算法的步骤(3)。这两行代码的作用是计算一个聚类中所有点的平均值。
最后的'collect'方法调用是将数据从RDD(弹性分布式数据集)转移到本地机器上(为将新的中心点广播到Spark环境中的所有节点做准备)。为了重复步骤(2)和(3),你需要把当前的中心点信息分发到每个节点。
你在使用 reduceByKey
的时候,需要给它传一个可以用来合并和减少数据的函数,因为这个方法要求你提供一个聚合函数。如果你的需求不适合用合并函数,那就得用 groupByKey
。另外,每次你在 Spark 的 RDD 上调用 map
时,你传入的函数可以看作是一个映射器。建议你去看看 RDD
文档 和 PairRDDFunctions
。要记住,Spark 程序通常会有多个映射和减少的阶段,因为它会尽量把中间结果保存在内存中,而标准的 Hadoop MapReduce 每次都要从磁盘读取和写入数据。如果你在使用 Spark,还可以用 MLlib 中的 k-means。
更新:
关于你的评论,他们之所以“把(总和 / 点数)映射到每个从节点”,是因为 Spark 的工作方式使得这样做没有额外的开销。由于 Spark 为每个 RDD 使用有向无环图(DAG),在执行一个动作(比如这里的 collect()
)之前,什么都不会被计算,所以最后的映射可以无缝地获取到减少器的输出,而这个输出应该不会溢出到磁盘,因为它非常小。这和 Hadoop 中的 ChainReducer
有点类似,但在 Spark 中,连接的 RDD 的每一步都是保存在内存中的(当然这并不总是可能,所以有时会溢出到磁盘,这也取决于序列化的级别)。所以基本上,最后的计算实际上是在和减少器同一个节点上完成的(之后不需要再进行数据洗牌),然后再收集到驱动程序中。