AWS Glue Job:SchemaColumnConvertNotSupportedException尝试将parquet文件写入S3时发生异常

2024-05-19 00:22:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我在AWS Glue目录中有一个表,其中包含所有字符串的数据类型,这些文件作为拼花文件存储在S3中。我想创建一个粘合作业,它只需从目录中读入数据,按日期对文件进行分区,然后将这些文件写回S3。但是,我一直得到一个SchemaColumnConvertNotSupportedException,它说parquet列不能被转换。在

我尝试了ApplyMapping函数,以确保所有数据在DynamicFrame中都表示为字符串

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import input_file_name

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)

spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

dyf = glueContext.create_dynamic_frame.from_catalog(database = "processed", table_name = "compass_2_daily_tkt_parquet")

mapping_dyf = ApplyMapping.apply(frame = tkt, mappings = [("tkt_id", "string", "tkt_id", "string"), ("tkt_nbr", "string", "tkt_nbr", "string"), ("tran_nbr", "string", "tran_nbr", "string"), ("pnr_ref", "string", "pnr_ref", "string"), ("provider_id", "string", "provider_id", "string"), ("carr_nbr", "string", "carr_nbr", "string"), ("agency_nbr", "string", "agency_nbr", "string"), ("point_of_orig_arpt_cd", "string", "point_of_orig_arpt_cd", "string"), ("dt_of_issue", "string", "dt_of_issue", "string"), ("curr_type_cd", "string", "curr_type_cd", "string"), ("fare_amt", "string", "fare_amt", "string"), ("fare_curr_type_cd", "string", "fare_curr_type_cd", "string"), ("tax_amt", "string", "tax_amt", "string"), ("fee_amt", "string", "fee_amt", "string"), ("comm_amt", "string", "comm_amt", "string"), ("doc_amt", "string", "doc_amt", "string"), ("pfc_amt", "string", "pfc_amt", "string"), ("proc_ped", "string", "proc_ped", "string"), ("intl_sale_ind", "string", "intl_sale_ind", "string"), ("e_tkt_ind", "string", "e_tkt_ind", "string"), ("fare_calc_ind", "string", "fare_calc_ind", "string"), ("tour_cd", "string", "tour_cd", "string"), ("dist_channel", "string", "dist_channel", "string"), ("cntry_cd", "string", "cntry_cd", "string"), ("stat_cd", "string", "stat_cd", "string"), ("tran_cd", "string", "tran_cd", "string"), ("data_source_cd", "string", "data_source_cd", "string"), ("data_sharing_id", "string", "data_sharing_id", "string"), ("load_ts", "timestamp", "load_ts", "timestamp"), ("net_fare_amt", "string", "net_fare_amt", "string"), ("suppl_fare_amt", "string", "suppl_fare_amt", "string"), ("contributed_ind", "string", "contributed_ind", "string"), ("print_cpui", "string", "print_cpui", "string"), ("file_id", "string", "file_id", "string"), ("print_fare_basis", "string", "print_fare_basis", "string"), ("waiver_cd", "string", "waiver_cd", "string"), ("comp_conj_cd", "string", "comp_conj_cd", "string"), ("tot_amt", "string", "tot_amt", "string"), ("bulk_ticketing_cd", "string", "bulk_ticketing_cd", "string"), ("equivalent_fare_amt", "string", "equivalent_fare_amt", "string"), ("equivalent_fare_curr_type_cd", "string", "equivalent_fare_curr_type_cd", "string"), ("fare_amt_usd", "string", "fare_amt_usd", "string"), ("tax_amt_usd", "string", "tax_amt_usd", "string"), ("doc_amt_usd", "string", "doc_amt_usd", "string"), ("fare_amt_eur", "string", "fare_amt_eur", "string"), ("tax_amt_eur", "string", "tax_amt_eur", "string"), ("doc_amt_eur", "string", "doc_amt_eur", "string"), ("depart_dt", "string", "depart_dt", "string"), ("trip_type_cd", "string", "trip_type_cd", "string"), ("tkt_dest_cd", "string", "tkt_dest_cd", "string"), ("dds_tkt_rndmzn_pcg", "string", "dds_tkt_rndmzn_pcg", "string")])

tkt_df = mapping_dyf.toDF().withColumn("filename", input_file_name())

tkt_df.repartition("filename").write.partitionBy("filename").mode("append").parquet("s3://landing-spot")

tkt_df.write.parquet("s3://landing-spot", partitionBy=["filename"])

datasink = glueContext.write_dynamic_frame.from_options(frame = tkt_dyf, connection_type = "s3", connection_options = {"path": "s3://landing-spot/", "partitionKeys": ["filename"]}, format = "parquet")

job.commit()
^{pr2}$

这让我很困惑,因为我正在将映射应用于DynamicFrame,但是当这个过程将数据写到S3时,AWS Glue似乎再次尝试推断模式。在


Tags: fromimportidstringdoctypecdusd

热门问题