我相信这是很简单的事情,但我没有发现任何与此相关的东西。
我的代码很简单:
...
stream = stream.map(mapper)
stream = stream.reduceByKey(reducer)
...
没什么特别的。输出如下:
...
key1 value1
key2 [value2, value3]
key3 [[value4, value5], value6]
...
等等。所以,有时我得到一个单位值(如果它是单值的话)。有时,嵌套列表可能非常非常深入(在我的简单测试数据中,深度为3层)。
我试着搜索类似“flat”的源代码,但只找到了flatMap方法(据我所知)不是我需要的。
我不知道为什么这些列表是嵌套的。我猜他们是由不同的流程(工人?)然后连在一起,没有压扁。
当然,我可以用Python编写一个代码来展开列表并将其展平。但我相信这不是一个正常的情况-我认为几乎每个人都需要一个单位产出。
找到不可迭代的值时,itertools.chain停止展开。换句话说,它仍然需要一些编码(前一段)。
那么-如何使用PySpark的本机方法平展列表?
谢谢
这里的问题是你的reduce函数。对于每个键,
reduceByKey
使用成对的值调用reduce函数,并期望它生成相同类型的组合值。例如,假设我想执行字数运算。首先,我可以将每个单词映射到一个
(word, 1)
对,然后我可以reduceByKey(lambda x, y: x + y)
来总结每个单词的计数。最后,我剩下的是(word, count)
对的RDD。下面是来自PySpark API Documentation的示例:
为了理解示例为什么不起作用,您可以想象reduce函数被应用如下:
基于reduce函数,听起来您可能正在尝试实现内置的^{} 操作,该操作使用其值列表对每个键进行分组。
另外,请看^{} ,这是
reduceByKey()
的一个泛化,它允许reduce函数的输入和输出类型不同(reduceByKey
是implemented,用combineByKey
表示)或者,
stream.groupByKey().mapValues(lambda x: list(x)).collect()
给出相关问题 更多 >
编程相关推荐