Spark上下文文本文件:加载多个文件

24 投票
4 回答
37511 浏览
提问于 2025-04-18 04:56

我需要处理分散在不同文件夹中的多个文件。我想把这些文件都加载到一个单独的RDD中,然后进行map/reduce操作。我发现SparkContext可以使用通配符从一个文件夹中加载多个文件,但我不太确定怎么从多个文件夹中加载文件。

下面的代码片段运行失败:

for fileEntry in files:
    fileName = basePath + "/" + fileEntry
    lines = sc.textFile(fileName)
    if retval == None:
        retval = lines
    else:
        retval = sc.union(retval, lines)

在第三次循环时出现了以下错误信息:

retval = sc.union(retval, lines)
TypeError: union() takes exactly 2 arguments (3 given)

这很奇怪,因为我只提供了两个参数。希望能得到一些指点。

4 个回答

1

你可以使用这个

首先,你可以获取一个包含S3路径的缓冲区/列表:

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

现在把这个列表对象传递给下面的代码,注意:sc是SQLContext的一个对象

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

现在你得到了一个最终的统一RDD,也就是df

可选的,你还可以把它重新分区成一个大的RDD

val files = sc.textFile(filename, 1).repartition(1)

重新分区总是有效的 :D

3

你可以使用SparkContext的以下功能:

wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

这个功能可以从HDFS、本地文件系统(在所有节点上都可以用)或者任何支持Hadoop的文件系统URI中读取一整个目录的文本文件。每个文件会被当作一个完整的记录来读取,并以键值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext

13

我通常通过使用通配符来解决类似的问题。

比如,我在想要在Spark中加载的文件里发现了一些特征,

目录

子目录1/文件夹1/x.txt

子目录2/文件夹2/y.txt

你可以使用下面这句话

sc.textFile("dir/*/*/*.txt")

来加载所有相关的文件。

需要注意的是,通配符 '*' 只在单层目录中有效,也就是说它不会递归到子目录里。

44

那这样说怎么样呢?

sc.union([sc.textFile(basepath + "/" + f) for f in files])

在Scala中,SparkContext.union()有两种用法,一种是可以接收多个参数,另一种是接收一个列表。只有第二种用法在Python中存在(因为Python不支持多态)。

更新

你可以通过一次textFile调用来读取多个文件。

sc.textFile(','.join(files))

撰写回答