我有大约2.5k的JSON文件,每个JSON文件代表1行。有了这些文件,我需要做一些非常简单的ETL,并将它们移到我的datalake的^{cd1>}部分。
我遍历我的datalake,并通过一个简单的^{cd2>}调用JSON文件,我在手前定义了JSON模式。
然后,我做ETL并尝试将这些文件写入datalake的单独部分,但是编写部分是非常exteryslow,只需15分钟编写一个只有几百kb的文件?
rp = spark.read.json(paths, multiLine=True,schema=json_s).withColumn('path',F.input_file_name())
for iter in iterable:
#do stuff
# filter my sparkDF with .filter
SparkDF_F = sparkDF.filter(...)
sparkDF_F.write('path/filename.parquet')
我试图使用“优化”并在我的路径上调用它
^{pr2}$会引发以下错误。
^{pr3}$有人能引导我了解我在这里的误解吗?
设置
两件事:
如果2.5k JSON文件存储在同一个文件夹中。您可以使用相同的文件夹路径直接读取它们:
卢比=spark.read.json(path_common,multiLine=True,schema=json\u s).withColumn('path',F.input_file_name())
然后,您可以应用rp.过滤器在整个数据帧中,因为它只有一个(不需要对每个文件进行迭代)
希望这有帮助
相关问题 更多 >
编程相关推荐