使用Pyspark数据库中的1000个JSON文件

2024-04-18 06:54:38 发布

您现在位置:Python中文网/ 问答频道 /正文

我有大约2.5k的JSON文件,每个JSON文件代表1行。有了这些文件,我需要做一些非常简单的ETL,并将它们移到我的datalake的^{cd1>}部分。

我遍历我的datalake,并通过一个简单的^{cd2>}调用JSON文件,我在手前定义了JSON模式。

然后,我做ETL并尝试将这些文件写入datalake的单独部分,但是编写部分是非常exteryslow,只需15分钟编写一个只有几百kb的文件?

rp  = spark.read.json(paths, multiLine=True,schema=json_s).withColumn('path',F.input_file_name())

for iter in iterable:
    #do stuff
    # filter my sparkDF with .filter
    SparkDF_F = sparkDF.filter(...)
    sparkDF_F.write('path/filename.parquet')

我试图使用“优化”并在我的路径上调用它

^{pr2}$

会引发以下错误。

^{pr3}$

有人能引导我了解我在这里的误解吗?

设置

  • Azure数据库
  • 6.0
  • 火花2.4
  • Python 3.6
  • 42GB群集,12个核心。
  • 4个节点
  • Azure Gen1 DataLake。

Tags: 文件pathjson定义模式etl代表filter
1条回答
网友
1楼 · 发布于 2024-04-18 06:54:38

两件事:

  1. 如果2.5k JSON文件存储在同一个文件夹中。您可以使用相同的文件夹路径直接读取它们:

    卢比=spark.read.json(path_common,multiLine=True,schema=json\u s).withColumn('path',F.input_file_name())

然后,您可以应用rp.过滤器在整个数据帧中,因为它只有一个(不需要对每个文件进行迭代)

  1. 你不仅可以直接对dbfs文件中的dbfs进行优化。因此,您可以使用dbfs中指向的目录创建表,并按照文档中的建议使用优化:https://docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html

希望这有帮助

相关问题 更多 >