如何从PySpark数据框中提取嵌套JSON的列值?
我正在处理一个PySpark的数据框(es_query
),里面有一些嵌套的JSON列(r_json
、brd_json
、vs_json
)。我需要帮助来提取这些列的数据,并把它们存储到另一个数据框(e_result
)中,作为两列,分别是URL和产品编号,每一行都是一个单独的记录。
最后,我们需要把所有的值放到一个数据框里。
上面提到的各个列的示例数据如下:
r_json:
results:
0: {"col1": "Yes", "name": "", "col2": 1, "col3": "76,67 €", "col4": "5,75 €", "productNumber": "B0e28213", "url": "https://www.am"}
1: {"col1": "Yes", "name": "", "col2": 1, "col3": "76,67 €", "col4": "5,75 €", "productNumber": "019883", "url": "https://www.am"}
brd_json:
array:
0: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "11873628", "rating": "4.1", "url": "https://www.amazon"}
1: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "001838", "rating": "4.1", "url": "https://www.amazon"}
vs_json:
array:
0: 0: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "1212", "rating": "4.1", "url": "https://www.amazon"}
1: 0: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "2321", "rating": "4.1", "url": "https://www.amazon"}
我能得到一些帮助来写这个脚本吗?下面是我尝试过的:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
result_urls = results.select("url").withColumn("source", lit("result"))
brsd_json_url = brsd_json.select("url").withColumn("source", lit("brand"))
vis_json_url = brsd_json.select("url").withColumn("source", lit("video"))
combined_urls = result_urls.union(brsd_json_url).union(vis_json_url)
esearch_result = combined_urls.groupBy("source").agg({"url": "concat_ws"}).select("concat_ws(url) as prod_page_url_address")
我尝试了下面的代码,这能帮助提取所有列作为独立的PySpark数据框,但现在的问题是如何把它们合并成一个数据框,同时考虑到所有的值。
column_names = ["url", "productNumber", "col2", "col3", "col4", "col5",
"col6", "col7"]
# Define a function to create the expressions
def create_expr(column_name, json_column):
try:
expr = esearch_request_query.selectExpr(f"EXPLODE({json_column}.{column_name}) as {column_name}")
except:
expr = None
return expr
# Iterate over the column names
for column_name in column_names:
results_expr = create_expr(column_name, "result_json.re")
brsd_json_url = create_expr(column_name, "brsd_json")
vis_json_url = create_expr(column_name, "vis_json")
globals()[f"results_{column_name}"] = results_expr
globals()[f"brsd_json_{column_name}"] = brsd_json_url
globals()[f"vis_json_{column_name}"] = vis_json_url
# case where "col2" does not exist in sponsored_video_json
try:
results_brandName = esearch_request_query.selectExpr("EXPLODE(result_json.re.col2) as col2_name")
except:
results_brandName = None
try:
sponnd_col2 = esearch_request_query.selectExpr("EXPLODE(brsd_json.spon) as col2_name")
except:
spond_col2 = None
spovideo_col2 = None # Handle this case separately, as it does not exist in vis_json
1 个回答
0
正如你提到的,你想要提取某一列的数据,并把它存储到另一个数据框(e_result)中。我已经定义了数据结构,并尝试了以下方法:
es_query = spark.createDataFrame([data], schema=schema)
r_json_df = es_query.select(explode("r_json").alias("r_json")).select(
col("r_json.productNumber").alias("productNumber"),
col("r_json.url").alias("url")
)
brd_json_df = es_query.select(explode("brd_json").alias("brd_json")).select(
col("brd_json.productNumber").alias("productNumber"),
col("brd_json.url").alias("url")
)
vs_json_df = es_query.select(explode("vs_json").alias("vs_json")).select(
col("vs_json.productNumber").alias("productNumber"),
col("vs_json.url").alias("url")
)
e_result = r_json_df.union(brd_json_df).union(vs_json_df)
e_result.show(truncate=False)
结果:
+-------------+------------------+
|productNumber|url |
+-------------+------------------+
|B0e28213 |https://www.am |
|019883 |https://www.am |
|11873628 |https://www.amazon|
|001838 |https://www.amazon|
|1212 |https://www.amazon|
|2321 |https://www.amazon|
+-------------+------------------+
- 在上面的代码中,它从es_query中选择了r_json这个数组列,然后把这个数组里的每个元素分开,放到新的行里,接着从分开的r_json列中选择了productNumber和url这两个字段。
- 接下来,它从brd_json这个数组列中提取了productNumber和url。
- 然后,它又从vs_json这个数组列中提取了productNumber和url。最后,它通过使用union把r_json_df、brd_json_df和vs_json_df这几个数据框合并成一个新的数据框e_result。