将csv加载到PySp中的DataFrame时出现问题

2024-04-19 18:11:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图将一堆CSV文件聚合成一个文件,并使用AWS Glue中的ETL作业以ORC格式输出到S3。我的聚合CSV如下所示:

header1,header2,header3
foo1,foo2,foo3
bar1,bar2,bar3

我有一个名为aggregated_csv的聚合CSV的字符串表示,它包含header1,header2,header3\nfoo1,foo2,foo3\nbar1,bar2,bar3的内容。 我读到pyspark有一种直接的方法将CSV文件转换成数据帧(我需要这样做,这样我就可以利用Glue的能力在ORC中轻松地输出)。以下是我尝试过的一个片段:

^{pr2}$

我试过了,有没有找过。当我不调用seek()时,作业将成功完成,但df.show()不显示除头以外的任何数据。调用seek()时,会出现以下异常:

pyspark.sql.utils.AnalysisException: u'Path does not exist: hdfs://ip-172-31-48-255.us-west-2.compute.internal:8020/user/root/header1,header2,header3\n;'

由于seek似乎改变了行为,而且csv中的头是异常字符串的一部分,我假设问题与我将文件传递给glueContext.read.csv()时文件光标的位置有关,但我不确定如何解决它。如果我取消了seek(0)调用的注释,并添加了一个agg_file.read()命令,我可以如预期那样看到文件的全部内容。我需要做些什么才能成功地读取我刚刚写入spark数据帧的csv文件?在


Tags: 文件csv数据作业seekglueorcfoo2
1条回答
网友
1楼 · 发布于 2024-04-19 18:11:37

我认为您给csv函数传递了错误的参数。我相信GlueContext.read.csv()将获得^{}的实例,它的签名将文件名作为第一个参数,而您传递的是一个类似文件的对象。在

def f(glueContext, aggregated_csv, schema):
    with open('somefile', 'a+') as agg_file:
        agg_file.write(aggregated_csv)
        #agg_file.seek(0)
    df = glueContext.read.csv('somefile', schema=schema, header="true")
    df.show()

但是,如果你想要它写一个ORC文件,并且你已经把数据读作aggregated_csv,你可以直接从元组列表中创建一个{}。在

^{pr2}$

然后,如果需要GlueDynamicFrame,请使用fromDF函数

dynF = fromDF(df, glueContext, 'myFrame')

还有一个,但是:你不需要胶水来写兽人-火花它完全可以。只需使用^{}函数:

df.write.orc('s3://path')

相关问题 更多 >