如何从PySpark数据框中提取嵌套JSON的列值?

1 投票
1 回答
91 浏览
提问于 2025-04-14 17:22

我正在处理一个PySpark的数据框(es_query),里面有一些嵌套的JSON列(r_jsonbrd_jsonvs_json)。我需要帮助来提取这些列的数据,并把它们存储到另一个数据框(e_result)中,作为两列,分别是URL和产品编号,每一行都是一个单独的记录。

最后,我们需要把所有的值放到一个数据框里。

上面提到的各个列的示例数据如下:

r_json:
results:
0: {"col1": "Yes", "name": "", "col2":  1, "col3": "76,67 €", "col4": "5,75 €", "productNumber": "B0e28213", "url": "https://www.am"}
1: {"col1": "Yes", "name": "", "col2":  1, "col3": "76,67 €", "col4": "5,75 €", "productNumber": "019883", "url": "https://www.am"}

brd_json:
array:
0: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "11873628", "rating": "4.1", "url": "https://www.amazon"}
1: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "001838", "rating": "4.1", "url": "https://www.amazon"}

vs_json:
array:
0: 0: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "1212", "rating": "4.1", "url": "https://www.amazon"}
1: 0: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "2321", "rating": "4.1", "url": "https://www.amazon"}

我能得到一些帮助来写这个脚本吗?下面是我尝试过的:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

result_urls = results.select("url").withColumn("source", lit("result"))
brsd_json_url = brsd_json.select("url").withColumn("source", lit("brand"))
vis_json_url = brsd_json.select("url").withColumn("source", lit("video"))

combined_urls = result_urls.union(brsd_json_url).union(vis_json_url)

esearch_result = combined_urls.groupBy("source").agg({"url": "concat_ws"}).select("concat_ws(url) as prod_page_url_address")

我尝试了下面的代码,这能帮助提取所有列作为独立的PySpark数据框,但现在的问题是如何把它们合并成一个数据框,同时考虑到所有的值。

column_names = ["url", "productNumber", "col2", "col3", "col4", "col5", 
                "col6", "col7"]

# Define a function to create the expressions
def create_expr(column_name, json_column):
    try:
        expr = esearch_request_query.selectExpr(f"EXPLODE({json_column}.{column_name}) as {column_name}")
    except:
        expr = None
    return expr

# Iterate over the column names
for column_name in column_names:
    results_expr = create_expr(column_name, "result_json.re")
    brsd_json_url  = create_expr(column_name, "brsd_json")
    vis_json_url  = create_expr(column_name, "vis_json")

    globals()[f"results_{column_name}"] = results_expr
    globals()[f"brsd_json_{column_name}"] = brsd_json_url
    globals()[f"vis_json_{column_name}"] = vis_json_url

# case where "col2" does not exist in sponsored_video_json
try:
    results_brandName = esearch_request_query.selectExpr("EXPLODE(result_json.re.col2) as col2_name")
except:
    results_brandName = None

try:
    sponnd_col2 = esearch_request_query.selectExpr("EXPLODE(brsd_json.spon) as col2_name")
except:
    spond_col2 = None

spovideo_col2 = None  # Handle this case separately, as it does not exist in vis_json

1 个回答

0

正如你提到的,你想要提取某一列的数据,并把它存储到另一个数据框(e_result)中。我已经定义了数据结构,并尝试了以下方法:

es_query = spark.createDataFrame([data], schema=schema)
r_json_df = es_query.select(explode("r_json").alias("r_json")).select(
    col("r_json.productNumber").alias("productNumber"),
    col("r_json.url").alias("url")
)
brd_json_df = es_query.select(explode("brd_json").alias("brd_json")).select(
    col("brd_json.productNumber").alias("productNumber"),
    col("brd_json.url").alias("url")
)
vs_json_df = es_query.select(explode("vs_json").alias("vs_json")).select(
    col("vs_json.productNumber").alias("productNumber"),
    col("vs_json.url").alias("url")
)
e_result = r_json_df.union(brd_json_df).union(vs_json_df)
e_result.show(truncate=False)

结果:

+-------------+------------------+
|productNumber|url               |
+-------------+------------------+
|B0e28213     |https://www.am    |
|019883       |https://www.am    |
|11873628     |https://www.amazon|
|001838       |https://www.amazon|
|1212         |https://www.amazon|
|2321         |https://www.amazon|
+-------------+------------------+
  • 在上面的代码中,它从es_query中选择了r_json这个数组列,然后把这个数组里的每个元素分开,放到新的行里,接着从分开的r_json列中选择了productNumber和url这两个字段。
  • 接下来,它从brd_json这个数组列中提取了productNumber和url。
  • 然后,它又从vs_json这个数组列中提取了productNumber和url。最后,它通过使用union把r_json_df、brd_json_df和vs_json_df这几个数据框合并成一个新的数据框e_result。

撰写回答