(PySpark)reduceByKey之后的嵌套列表

2024-04-28 01:38:53 发布

您现在位置:Python中文网/ 问答频道 /正文

我相信这是很简单的事情,但我没有发现任何与此相关的东西。

我的代码很简单:

... 
stream = stream.map(mapper) 
stream = stream.reduceByKey(reducer) 
... 

没什么特别的。输出如下:

... 
key1  value1 
key2  [value2, value3] 
key3  [[value4, value5], value6] 
... 

等等。所以,有时我得到一个单位值(如果它是单值的话)。有时,嵌套列表可能非常非常深入(在我的简单测试数据中,深度为3层)。

我试着搜索类似“flat”的源代码,但只找到了flatMap方法(据我所知)不是我需要的。

我不知道为什么这些列表是嵌套的。我猜他们是由不同的流程(工人?)然后连在一起,没有压扁。

当然,我可以用Python编写一个代码来展开列表并将其展平。但我相信这不是一个正常的情况-我认为几乎每个人都需要一个单位产出。

找到不可迭代的值时,itertools.chain停止展开。换句话说,它仍然需要一些编码(前一段)。

那么-如何使用PySpark的本机方法平展列表?

谢谢


Tags: 方法代码map列表stream单位事情key2
2条回答

这里的问题是你的reduce函数。对于每个键,reduceByKey使用成对的值调用reduce函数,并期望它生成相同类型的组合值。

例如,假设我想执行字数运算。首先,我可以将每个单词映射到一个(word, 1)对,然后我可以reduceByKey(lambda x, y: x + y)来总结每个单词的计数。最后,我剩下的是(word, count)对的RDD。

下面是来自PySpark API Documentation的示例:

>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]

为了理解示例为什么不起作用,您可以想象reduce函数被应用如下:

reduce(reduce(reduce(firstValue, secondValue), thirdValue), fourthValue) ...

基于reduce函数,听起来您可能正在尝试实现内置的^{}操作,该操作使用其值列表对每个键进行分组。

另外,请看^{},这是reduceByKey()的一个泛化,它允许reduce函数的输入和输出类型不同(reduceByKeyimplemented,用combineByKey表示)

或者,stream.groupByKey().mapValues(lambda x: list(x)).collect()给出

key1 [value1]
key2 [value2, value3]
key3 [value4, value5, value6]

相关问题 更多 >