从S3存储桶读取大量CSV文件

2024-04-23 13:42:39 发布

您现在位置:Python中文网/ 问答频道 /正文

我想从S3存储桶中读取大量csv文件。CSV文件位于不同的分区中。我正在使用Boto3列出csv的所有路径。然后使用for循环在列表上迭代,将csv文件读入spark dataframe。我需要一种更好的优化方法来从S3路径读取大量文件,因为循环是一种线性方法,需要花费很多时间才能完成。 列出所有对象:

self.all_objects = [file_path['Key'] for resp_content in self.s3.get_paginator("list_objects_v2").paginate(Bucket='bucketName') for file_path in resp_content['Contents']]

读取循环中的每个CSV文件:

csv_df = self.spark.read.format("csv").option("header", "true").load(s3_path)

此外,我想合并所有的数据框一起创建一个拼花地板文件

提前感谢如果有人有一个很好的解决方案,请建议


Tags: 文件csvpath方法inself路径for
1条回答
网友
1楼 · 发布于 2024-04-23 13:42:39

读取包含多个文件夹的csv文件(Spark+Scala):

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

val sourcesFolders = List("/home/mykolavasyliv/tmp/source1/", "/home/mykolavasyliv/tmp/source2/", "/home/mykolavasyliv/tmp/source3/")

//  :~/tmp$ tree
//    .
//  ├── source1
//  │   └── person-data-1.csv
//  ├── source2
//  │   └── person-data-2.csv
//  └── source3
//      └── person-data-3.csv

//  person-data-1.csv:
//  source-1-1,Mykola ,23,100
//  source-1-2,Jon,34,76
//  source-1-3,Marry,25,123

//  person-data-2.csv
//  source-2-1,Mykola ,23,100
//  source-2-2,Jon,34,76
//  source-2-3,Marry,25,123

//  person-data-3.csv
//  source-3-1,Mykola ,23,100
//  source-3-2,Jon,34,76
//  source-3-3,Marry,25,123




// Read csv files not use schema

val sourceDF = spark.read.csv(sourcesFolders:_*)

sourceDF.show(false)
//  +     +   -+ -+ -+
//  |_c0       |_c1    |_c2|_c3|
//  +     +   -+ -+ -+
//  |source-1-1|Mykola |23 |100|
//  |source-1-2|Jon    |34 |76 |
//  |source-1-3|Marry  |25 |123|
//  |source-2-1|Mykola |23 |100|
//  |source-2-2|Jon    |34 |76 |
//  |source-2-3|Marry  |25 |123|
//  |source-3-1|Mykola |23 |100|
//  |source-3-2|Jon    |34 |76 |
//  |source-3-3|Marry  |25 |123|
//  +     +   -+ -+ -+



// Read csv files use schema

val schema = StructType(
  List(
    StructField("id", StringType, true),
    StructField("name", StringType, true),
    StructField("age", IntegerType, true),
    StructField("NotKnow", IntegerType, true)
  )
)

val source2DF = spark.read.schema(schema).csv(sourcesFolders:_*)

source2DF.show(false)
//  +     +   -+ -+   -+
//  |id        |name   |age|NotKnow|
//  +     +   -+ -+   -+
//  |source-1-1|Mykola |23 |100    |
//  |source-1-2|Jon    |34 |76     |
//  |source-1-3|Marry  |25 |123    |
//  |source-2-1|Mykola |23 |100    |
//  |source-2-2|Jon    |34 |76     |
//  |source-2-3|Marry  |25 |123    |
//  |source-3-1|Mykola |23 |100    |
//  |source-3-2|Jon    |34 |76     |
//  |source-3-3|Marry  |25 |123    |
//  +     +   -+ -+   -+

相关问题 更多 >