从S3读取大JSON文件(3K+)并从数组中选择特定的键

2024-05-17 16:16:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要读取存储在s3中的多个JSON文件(3K+),所有这些文件都具有相同的结构。这个结构非常大,而且是嵌套的。在这些文件中,是一个包含对象的数组,键:值对。我需要从这些键中选择一些,并将值写入PySpark数据帧。我正在使用PySpark/Python3在AWS Glue中编写代码。你知道吗

到目前为止,我尝试从S3文件创建一个数据帧,然后推断模式。我不确定这是否正确,也不确定这是否是最有效的。我也不知道下一步要在哪里找到“Products”数组并从数组中提取几个键。你知道吗

json_data_frame = spark.read.json("s3://" + args['destinationBucketName'] + "/" + args['s3SourcePath'])

json_schema = spark.read.json(json_data_frame.rdd.map(lambda row: row.json)).schema

我想要的结果是一个包含列的数据帧,每个列都是数组中的键,并且具有整个s3文件中的所有值。你知道吗

编辑:我做得更进一步了:

json_data_frame = spark.read.option("multiLine", True).option("mode", "PERMISSIVE").json("s3://" + args['destinationBucketName'] + "/" + args['s3SourcePath'])

final_data_frame_prep = json_data_frame.withColumn("name", json_data_frame["products"].getItem("name")).withColumn("ndc_product_code", json_data_frame["products"].getItem("ndc_product_code"))

final_data_frame = final_data_frame_prep.select("name","ndc_product_code")

final_data_frame.show(20,False)

我现在所处的位置,dataframe正在创建一个我可能会怀疑的值,除了每个值都是一个列表,有些是单个项,有些是多个。我现在需要把列表分成几行。如果你有什么建议,我会很乐意的。当前数据帧:

+------------------+----------------------+
|name |ndc_product_code |
+------------------+----------------------+
|[Refludan] |[50419-150] |
|[Erbitux, Erbitux]|[66733-948, 66733-958]|
+------------------+----------------------+

编辑2:

json_data_frame = spark.read.option("multiLine", True).option("mode", "PERMISSIVE").json("s3://" + args['destinationBucketName'] + "/" + args['s3SourcePath'])

final_data_frame_prep = json_data_frame.withColumn("name", explode(json_data_frame["products"].getItem("name"))).withColumn("ndc_product_code", explode(json_data_frame["products"].getItem("ndc_product_code"))).withColumn("dosage_form", explode(json_data_frame["products"].getItem("dosage_form"))).withColumn("strength", explode(json_data_frame["products"].getItem("strength")))

final_data_frame = final_data_frame_prep.select("name","ndc_product_code","dosage_form","strength")

final_data_frame.show(20,False)

我可以在代码中添加一个explode,以及剩下的两列,但是在dataframe中看到了重复,好像列表匹配了所有的可能性,而不是匹配了键来自的数组中的对象。数据帧现在是:

+--------+----------------+-----------+---------+
|name |ndc_product_code|dosage_form|strength |
+--------+----------------+-----------+---------+
|Refludan|50419-150 |Powder |50 mg/1mL|
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
+--------+----------------+-----------+---------+

编辑3: 我不相信爆炸是我想要的。我把代码恢复到编辑1。该表显示为

+------------------+----------------------+
|name |ndc_product_code |
+------------------+----------------------+
|[Refludan] |[50419-150] |
|[Erbitux, Erbitux]|[66733-948, 66733-958]|
+------------------+----------------------+

我想要的是:

+------------------+----------------------+
|name |ndc_product_code |
+------------------+----------------------+
|[Refludan]|[50419-150]|
|[Erbitux]|[66733-948]|
|[Erbitux]|[66733-958]|
+------------------+----------------------+

有没有办法做到这一点,即匹配数组中的位置并在此基础上创建新行?你知道吗


Tags: namejsondataargscode数组productframe
1条回答
网友
1楼 · 发布于 2024-05-17 16:16:37

我明白了!你知道吗

+    +        +     -+    -+
|name |ndc_product_code|dosage_form|strength |
+    +        +     -+    -+
|Refludan|50419-150 |Powder |50 mg/1mL|
|Erbitux |66733-948 |Solution |2 mg/1mL |
|Erbitux |66733-958 |Solution |2 mg/1mL |
+    +        +     -+    -+

代码是:

# Read in the json files from s3
json_data_frame = spark.read.option("multiLine", True).option("mode", "PERMISSIVE").json("s3://" + args['destinationBucketName'] + "/" + args['s3SourcePath'])

final_data_frame_prepprep = json_data_frame.withColumn("products_exp", explode(json_data_frame["products"]))\

final_data_frame_prep = final_data_frame_prepprep.withColumn("name", final_data_frame_prepprep["products_exp"].getItem("name"))\
                                             .withColumn("ndc_product_code", final_data_frame_prepprep["products_exp"].getItem("ndc_product_code"))\
                                             .withColumn("dosage_form", final_data_frame_prepprep["products_exp"].getItem("dosage_form"))\
                                             .withColumn("strength", final_data_frame_prepprep["products_exp"].getItem("strength"))

final_data_frame = final_data_frame_prep.select("name","ndc_product_code","dosage_form","strength")

final_data_frame.show(20,False)

关键是将数据分解为一个整体,然后从数组中获取项目,然后选择要保留的内容。我希望这能帮助别人干杯

相关问题 更多 >