Apache Spark与Python Lambda

8 投票
3 回答
31535 浏览
提问于 2025-04-18 12:07

我有以下代码

file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")

http://spark.apache.org/examples.html 我是从这里复制的例子

我对这段代码有点搞不懂,特别是以下几个关键词

  1. flatmap,
  2. map 和
  3. reduceby

能不能用简单的语言解释一下这些在干什么。

3 个回答

1

这里的回答在代码层面上是准确的,但了解背后的原理可能会更有帮助。

我的理解是,当调用一个“归约”(reduce)操作时,会发生大量的数据重新分配。这意味着通过“映射”(map)操作得到的所有键值对(K-V对),如果它们的键值相同,就会被分配到一个任务中,这个任务会对这些键值对的值进行求和。然后,这些任务会被分配到不同的物理处理器上,最后结果会通过另一轮数据重新分配来整理。

举个例子,如果“映射”操作的结果是: (cat 1) (cat 1) (dog 1) (cat 1) (cat 1) (dog 1)

那么“归约”操作的结果就是: (cat 4) (dog 2)

希望这能帮到你。

5

请查看内联评论:

file = spark.textFile("hdfs://...") # opens a file
counts = file.flatMap(lambda line: line.split(" ")) \  # iterate over the lines, split each line by space (into words)
             .map(lambda word: (word, 1)) \ # for each word, create the tuple (word, 1)
             .reduceByKey(lambda a, b: a + b) # go over the tuples "by key" (first element) and sum the second elements
counts.saveAsTextFile("hdfs://...")

关于reduceByKey的更详细解释可以在 这里 找到

16

map 是最简单的,它的意思是对序列中的每个元素执行给定的操作,并返回处理后的结果序列(这和 foreach 很像)。flatMap 也是做类似的事情,但它允许你对每个元素返回一个序列(这个序列可以是空的),而不仅仅是一个元素。这里有一个回答解释了 mapflatMap 之间的区别。最后,reduceByKey 接受一个聚合函数(这意味着它需要两个相同类型的参数并返回这个类型,且应该是可交换和结合的,否则你会得到不一致的结果),用于对序列中的每个 (K,V) 对中的每个 K 聚合所有的 V

示例*:
reduce (lambda a, b: a + b,[1,2,3,4])

这表示用 + 来聚合整个列表,所以它会执行

1 + 2 = 3  
3 + 3 = 6  
6 + 4 = 10  
final result is 10

按键聚合是类似的,只不过你是对每个唯一的键进行聚合。


所以在你的例子中解释一下

file = spark.textFile("hdfs://...") // open text file each element of the RDD is one line of the file
counts = file.flatMap(lambda line: line.split(" ")) //flatMap is needed here to return every word (separated by a space) in the line as an Array
             .map(lambda word: (word, 1)) //map each word to a value of 1 so they can be summed
             .reduceByKey(lambda a, b: a + b) // get an RDD of the count of every unique word by aggregating (adding up) all the 1's you wrote in the last step
counts.saveAsTextFile("hdfs://...") //Save the file onto HDFS

那么,为什么要这样计算单词数量呢?原因是 MapReduce 编程范式可以高度并行化,因此可以扩展到处理数TB甚至PB的数据。


我不太用 Python,如果我犯了错误请告诉我。

撰写回答