使用(spark、python、pyspark、jupyter)将多个项目保存到HDFS

2024-03-28 11:11:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我习惯用Python编程。我的公司现在安装了一个安装了Jupyter的Hadoop集群。到现在为止,我从来没有用过Spark/Pyspark。在

我可以轻松地从HDFS加载文件:

text_file = sc.textFile("/user/myname/student_grades.txt")

我可以这样写输出:

^{pr2}$

我试图实现的是使用一个简单的“for循环”逐个读取文本文件并将其内容写入一个HDFS文件。所以我试了一下:

list = ['text1.txt', 'text2.txt', 'text3.txt', 'text4.txt']

for i in list:
    text_file = sc.textFile("/user/myname/" + i)
    text_file.saveAsTextFile("/user/myname/all.txt")

因此,这适用于列表的第一个元素,但随后会显示以下错误消息:

Py4JJavaError: An error occurred while calling o714.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory 
XXXXXXXX/user/myname/all.txt already exists

为了避免混淆,我“脱口而出”—用XXXXXXXX说出IP地址。


正确的方法是什么? 我将有大量的数据集(如'text1'、'text2'…)并希望在将它们保存到HDFS之前对每个数据集执行python函数。但我想把结果都放在“一个”输出文件中。在

非常感谢!
毫克

编辑: 似乎我的最终目标并不明确。我需要对每个文本文件分别应用一个函数,然后我想将输出附加到现有的输出目录中。像这样:

for i in list:
    text_file = sc.textFile("/user/myname/" + i)
    text_file = really_cool_python_function(text_file)
    text_file.saveAsTextFile("/user/myname/all.txt")

Tags: 文件texttxtforhdfsalllistfile
3条回答

如果文本文件都有相同的模式,那么可以使用Hive将整个文件夹作为一个表读取,并直接写入输出。在

您可以读取多个文件并通过

textfile = sc.textFile(','.join(['/user/myname/'+f for f in list]))
textfile.saveAsTextFile('/user/myname/all')

您将获得输出目录中的所有部件文件。在

我想把这篇文章作为评论发表,但由于我没有足够的声誉而无法发表。

您必须将RDD转换为dataframe,然后以追加模式写入。要将RDD转换为dataframe,请查看以下答案:
https://stackoverflow.com/a/39705464/3287419
或者这个链接http://spark.apache.org/docs/latest/sql-programming-guide.html
要在下面的附加模式下保存数据帧,链接可能很有用:
http://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes

几乎相同的问题也在这里Spark: Saving RDD in an already existing path in HDFS。但答案是针对scala的。我希望类似的事情也能在python中实现。

还有另一种方法(但很难看)。将RDD转换为字符串。让结果字符串为resultString。使用子进程将该字符串附加到目标文件,即

subprocess.call("echo "+resultString+" | hdfs dfs -appendToFile - <destination>", shell=True)

相关问题 更多 >