使用spark cs编写单个CSV文件

2024-03-29 05:03:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用https://github.com/databricks/spark-csv,我正在尝试编写一个CSV,但不能,它正在创建一个文件夹。

需要一个Scala函数,该函数将接受path和file name等参数并写入CSV文件。


Tags: 文件csvpath函数namehttpsgithub文件夹
3条回答

它正在创建一个包含多个文件的文件夹,因为每个分区都是单独保存的。如果需要单个输出文件(仍在文件夹中),可以repartition(如果上游数据很大,但需要洗牌,则首选):

df
   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

coalesce

df
   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

保存前数据帧:

所有数据都将写入mydata.csv/part-00000。在使用此选项之前,请确保您了解正在进行的操作,以及将所有数据传输到单个工人的成本。如果将分布式文件系统与复制一起使用,则数据将被多次传输—首先提取到单个工作机,然后在存储节点上分发。

或者,您可以保持代码原样,然后使用诸如catHDFS ^{}之类的通用工具简单地合并所有部分。

如果你用HDFS运行Spark,我已经通过正常地编写csv文件和利用HDFS进行合并来解决了这个问题。我在Spark(1.6)中直接这么做:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}


val newData = << create your dataframe >>


val outputfile = "/user/feeds/project/outputs/subject"  
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName

    newData.write
        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()

我不记得我从哪儿学的这个把戏,但可能对你有用。

在这里,我可能玩得有点晚,但是使用coalesce(1)repartition(1)可能对小数据集有效,但是大数据集都会被抛出到一个节点上的一个分区中。这很可能会抛出OOM错误,或者最多是处理得慢一些。

我强烈建议您使用Hadoop API中的^{}函数。这将把输出合并到一个文件中。

编辑-这有效地将数据带到驱动程序而不是执行器节点。Coalesce()如果一个执行器有比驱动程序更多的RAM可用,那就没问题了。

编辑2copyMerge()在Hadoop 3.0中被删除。有关如何使用最新版本的详细信息,请参阅下面的堆栈溢出文章:How to do CopyMerge in Hadoop 3.0?

相关问题 更多 >