如何有效地将spark中的数据帧与小文件目录连接起来?

2024-04-29 19:34:19 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个数据帧(df1),其中有id, date, type列。我需要用一个文件/descriptions/{id}.txt连接它。联接的结果应该是(df1)中所有条目的数据帧id, date, type, description

我们的目标是使用这个预处理的数据帧进行进一步的分析,这样我就不必再处理小文件了

值得注意的是:小的描述文件比我需要的要多得多(x1000),所以我认为某种“惰性连接”比预先读取所有小文件然后连接更有效

您将如何在Spark中构建此功能?我目前使用scala,但如果您有python示例,我想我也可以使用它


Tags: 文件数据目的功能txtid目标date
1条回答
网友
1楼 · 发布于 2024-04-29 19:34:19

选项1,读取所有文件:

您可以使用wholeTextFiles阅读说明。它返回一个RDD,将文件路径映射到它们的内容,然后将结果与数据帧连接起来

val descriptions = sc
    .wholeTextFiles(".../descriptions")
    // We need to extract the id from the file path
    .map{ case (path, desc) => {
        val fileName = path.split("/").last
        val id = "[0-9]+".r.findFirstIn(fileName)
        id.get.toLong -> desc
    }}
    .toDF("id", "description")

val result = df1.join(descriptions, Seq("id"))

选项2,仅读取所需文件

为此,您可以使用binaryFiles。它创建一个RDD,将每个文件路径映射到DataStream。因此,不会立即读取这些文件。然后,您可以从df1中选择所有不同的id,将它们与RDD连接起来,然后只读取所需文件的内容。代码如下所示:

val idRDD = df1
    .select("id").distinct
    .rdd.map(_.getAs[Long]("id") -> true)

val descriptions = sc.binaryFiles(".../descriptions")
    // same as before, but the description is not read yet
    .map{ case (path, descFile) => {
        val fileName = path.split("/").last
        val id = "[0-9]+".r.findFirstIn(fileName)
        id.get.toLong -> descFile
    }} // inner join with the ids we are interested in
    .join(idRDD)
    .map{ case(id, (file, _)) => id -> file}
    // reading the files
    .mapValues(file => {
         val reader = scala.io.Source.fromInputStream(file.open)
         val desc = reader.getLines.mkString("\n")
         reader.close
         desc
    })
    .toDF("id", "description")

val result = df1.join(descriptions, Seq("id"))

相关问题 更多 >