import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
val newData = << create your dataframe >>
val outputfile = "/user/feeds/project/outputs/subject"
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob = outputFileName
newData.write
.format("com.databricks.spark.csv")
.option("header", "false")
.mode("overwrite")
.save(outputFileName)
merge(mergeFindGlob, mergedFileName )
newData.unpersist()
它正在创建一个包含多个文件的文件夹,因为每个分区都是单独保存的。如果需要单个输出文件(仍在文件夹中),可以
repartition
(如果上游数据很大,但需要洗牌,则首选):或
coalesce
:保存前数据帧:
所有数据都将写入
mydata.csv/part-00000
。在使用此选项之前,请确保您了解正在进行的操作,以及将所有数据传输到单个工人的成本。如果将分布式文件系统与复制一起使用,则数据将被多次传输—首先提取到单个工作机,然后在存储节点上分发。或者,您可以保持代码原样,然后使用诸如} 之类的通用工具简单地合并所有部分。
cat
或HDFS ^{如果你用HDFS运行Spark,我已经通过正常地编写csv文件和利用HDFS进行合并来解决了这个问题。我在Spark(1.6)中直接这么做:
我不记得我从哪儿学的这个把戏,但可能对你有用。
在这里,我可能玩得有点晚,但是使用
coalesce(1)
或repartition(1)
可能对小数据集有效,但是大数据集都会被抛出到一个节点上的一个分区中。这很可能会抛出OOM错误,或者最多是处理得慢一些。我强烈建议您使用Hadoop API中的^{} 函数。这将把输出合并到一个文件中。
编辑-这有效地将数据带到驱动程序而不是执行器节点。
Coalesce()
如果一个执行器有比驱动程序更多的RAM可用,那就没问题了。编辑2:
copyMerge()
在Hadoop 3.0中被删除。有关如何使用最新版本的详细信息,请参阅下面的堆栈溢出文章:How to do CopyMerge in Hadoop 3.0?相关问题 更多 >
编程相关推荐