擅长:python、mysql、java
<p>如果你用HDFS运行Spark,我已经通过正常地编写csv文件和利用HDFS进行合并来解决了这个问题。我在Spark(1.6)中直接这么做:</p>
<pre><code>import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
val newData = << create your dataframe >>
val outputfile = "/user/feeds/project/outputs/subject"
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob = outputFileName
newData.write
.format("com.databricks.spark.csv")
.option("header", "false")
.mode("overwrite")
.save(outputFileName)
merge(mergeFindGlob, mergedFileName )
newData.unpersist()
</code></pre>
<p>我不记得我从哪儿学的这个把戏,但可能对你有用。</p>