Pyspark使用AWS胶水将JSON列写入Postgres

2024-04-20 12:57:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我有以下数据帧(df4):

+------------------------------------+------------------------------------------+--------------------------+--------------------------+
|id                                  |exclusion_reason                          |created_at                |updated_at                |
+------------------------------------+------------------------------------------+--------------------------+--------------------------+
|4c01d951-2ec5-4ba4-bfe2-8ba9c3029962|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|dac14ca3-bf44-4e3c-80e8-0e2d6d2ff576|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|6d277012-ff6c-4202-bbd7-64cbd467ca28|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|0388163e-2614-4b71-b707-623337d58387|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|01daec52-408c-44e3-965a-b87daa334a1a|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
+------------------------------------+------------------------------------------+--------------------------+--------------------------+

我需要写入Postgres数据库。我使用的是AWS Glue,Postgres数据库位于VPC中,因此我需要使用Glue连接和glueContext.write_dynamic_frame.from_jdbc_conf方法。问题是我一直收到错误ERROR: column "matchback_exclusion_reason" is of type jsonb but expression is of type character。数据帧中的数据类型是string,数据库中的数据类型是JSONB

我看到有人建议我只需要将stringtype: "unspecified"添加到我的write语句中,但以下操作会产生相同的错误:

datasource2 = DynamicFrame.fromDF(df4, glueContext, "ParquetToWrite")

output = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource2, catalog_connection = "MPtest", connection_options = {"stringtype":"unspecified", "database" : "app", "dbtable" : "orders"})

我是否可以以某种方式将此列转换为JSON?我曾尝试创建一个结构类型来解析元素,但也没有成功(下面的代码):

schema = StructType([StructField("ship_reason", StringType()),StructField("bill_reason", StringType())])
df4Test.select(f.from_json(df4.exclusion_reason, schema).alias("exclusion_reason"))

df4Test = df4Test.withColumn("exclusion_reason", f.from_json(df4.exclusion_reason, schema).alias("exclusion_reason"))

是否可以将列类型修改为JSONB类型?理想情况下,我基本上只想“json.load”排除原因列,这样我就可以将其写入Postgres


Tags: from数据库类型schemaa1postgresframewrite
1条回答
网友
1楼 · 发布于 2024-04-20 12:57:02

^{}可以通过将exclusion_reason转换为json来帮助:

datasource3 = datasource2.resolveChoice(specs = [('exclusion_reason','cast:json')])

AWS Glue Developer Guide

Use ResolveChoice to specify how a column should be handled when it contains values of multiple types. You can choose to either cast the column to a single data type, discard one or more of the types, or retain all types in either separate columns or a structure. You can select a different resolution policy for each column or specify a global policy that is applied to all columns.

相关问题 更多 >