使用Python中的Spark RDD进行foreach循环

2024-03-29 05:24:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试在集群上运行一个非常大的RDD并将其写入.csv。它太大了,.collect()会中断,所以我希望将RDD保存到每个节点上的各个部分,然后以某种方式将它们组合在一起,因为顺序无关紧要。我的想法是将foreach与CSV打印机功能结合使用,这样每个部分都会写入它的值,然后我可以手动收集这些部分,也许是通过FTP。

我是一个有经验的Spark用户,但是到目前为止,我还没有能够让RDD的foreach方法做任何有用的事情。当我试图运行文档中给出的示例时

>>> def f(x): print x
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)

我的控制台上什么也没有。我认为这是因为“打印”是在单独的节点上执行的,而不是在控制台所在的namenode上执行的。不过,在这种情况下,我真的看不出foreach函数有什么意义!

如何在不首先调用collect()函数的情况下将for each的结果返回到name节点?

注意。我也可以使用saveAsTextFile()RDD函数,但我还是没能成功!它似乎创建了一个文件夹而不是一个文本文件,尽管这可能是因为它们存在于每个节点上而不是集中?


Tags: csv函数功能节点顺序方式打印机情况