使用Python3.6在Amazon EMR集群(1个主节点,2个节点)上运行Spark 2.4.2
我在amazons3中读取对象,将它们压缩为parquet格式,并将它们添加(附加)到现有的parquet数据存储中。当我在pyspark shell中运行我的代码时,我能够读取/压缩对象并将新的拼花文件添加到现有的parquet文件中,当我对parquet数据运行查询时,它显示所有数据都在parquet文件夹中。但是,当我在EMR集群上的一个步骤中运行代码时,现有的parquet文件将被新文件覆盖。同样的查询将显示只有新数据存在,并且包含parquet数据的s3文件夹只有新数据。在
以下是步骤的关键代码:
spark = SparkSession.builder \
.appName("myApp") \
.getOrCreate()
df_p = spark.read \
.format('parquet') \
.load(parquet_folder)
the_schema = df_p.schema
df2 = spark.read \
.format('com.databricks.spark.xml') \
.options(rowTag='ApplicationSubmission', \
path=input_folder) \
.schema(the_schema) \
.load(input_folder+'/*.xml')
df2.coalesce(10) \
.write \
.option('compression', 'snappy') \
.option('path', parquet_folder) \
.format('parquet') \
.mode('append') \
.saveAsTable(table_name, mode='append')
我希望这会将来自input_folder
的数据附加到parquet_folder
中的现有数据,但在EMR步骤中执行时,它会被覆盖。我尝试过在.saveAsTable
中没有mode='append'
(在pyspark shell中没有必要)。在
建议?在
我不知道为什么你的方法不起作用,但是我用}得到了更好的结果。我不知道这种行为的原因,但我以前没有见过
.parquet(path)
而不是{saveAsTable
用于保存数据对象,因为它在配置单元元存储区中创建了一个表(它不是一个“物理”数据对象)。在如果您的步骤运行在apachelivy中,那么它们的行为可能与在shell上的不同。如果您确实在使用Livy,您可以在齐柏林笔记本上测试代码,在代码单元上指出应该使用
%livy-pyspark
执行器来运行代码。在相关问题 更多 >
编程相关推荐