从字典列表创建pyspark数据帧

2024-05-29 09:57:38 发布

您现在位置:Python中文网/ 问答频道 /正文

我有下面的字典清单

结果=

 [
    {
        "type:"check_datatype",
        "kwargs":{
            "table":"cars","column_name":"vin","d_type":"string"
            }
    },
    {
        "type":"check_emptystring",
        "kwargs":{
            "table":"cars","column_name":"vin"
            }
    },
    {
        "type:"check_null",
        "kwargs":{
            "table":"cars","columns":["vin","index"]
            }
    }
]

我想用下面的模式创建两个不同的pyspark数据帧-

Table Schems

当我们有唯一的一对(类型,kwargs)时,结果表中的args_id列将是相同的。这个JSON必须每天运行,因此如果它再次发现相同的一对(type,kwargs),它应该给出相同的args_id值

到现在为止,我已经写了这段代码-

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
check_type_results = [[elt['type']] for elt in results]
checkColumns = ['type']
spark = SparkSession.builder.getOrCreate()
checkResultsDF = spark.createDataFrame(data=check_type_results, schema=checkColumns)
checkResultsDF = checkResultsDF.withColumn("time", F.current_timestamp())
checkResultsDF = checkResultsDF.withColumn("args_id", F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
checkResultsDF.printSchema()

现在,在我的代码中,我总是以递增的顺序获得args_id,这对于第一次运行是正确的,但是如果我在第二天再次运行json,或者可能是在同一天,并且json文件中有一对(type,kwargs)已经出现,所以我应该为这对使用相同的args_id

若某对(类型,kwargs)在Arguments表中并没有条目,那个么只有我将插入Arguments表,但若该对(类型,kwargs)已经存在于Arguments表中,那个么不应该在那个里进行插入

一旦这两个数据帧被正确填充,那么我想将它们加载到单独的增量表中

参数表中的Hashcode列是每个“kwargs”的唯一标识符


Tags: fromimportid类型sqlchecktypetable
1条回答
网友
1楼 · 发布于 2024-05-29 09:57:38

问题

您的模式有点不完整。更详细的模式将允许您利用更多spark功能。请参阅下面使用spark-sqlpyspark的解决方案。与需要有序分区的窗口函数不同,您可以利用一些表生成数组函数,例如explodeposexplode,这些函数在spark-sql中可用。由于它涉及到写入delta表,您可能会看到示例here

解决方案1:使用Spark SQL

设置

from pyspark.sql.types import ArrayType,StructType, StructField, StringType, MapType
from pyspark.sql import Row, SparkSession

sparkSession = SparkSession.builder.appName("Demo").getOrCreate()

模式定义

示例记录是一个结构/对象数组,其中kwargs是一个带有可选键的Maptype。注意。True表示可选,当缺少键或具有不同格式的条目时,应提供帮助

schema = StructType([
    StructField("entry",ArrayType(
        StructType([
            StructField("type",StringType(),True),
            StructField("kwargs",MapType(StringType(),StringType()),True)
        ])
    ),True)
])

可复制示例

result_entry =[
    {
        "type":"check_datatype",
        "kwargs":{
            "table":"cars","column_name":"vin","d_type":"string"
            }
    },
    {
        "type":"check_emptystring",
        "kwargs":{
            "table":"cars","column_name":"vin"
            }
    },
    {
        "type":"check_null",
        "kwargs":{
            "table":"cars","columns":["vin","index"]
            }
    }
]

df_results = sparkSession.createDataFrame([Row(entry=result_entry)],schema=schema)
df_results.createOrReplaceTempView("df_results")
df_results.show()

结果

+          +
|               entry|
+          +
|[{check_datatype,...|
+          +

结果表生成

我已经使用current_date捕获了当前日期,但是您可以根据管道更改此日期

results_table = sparkSession.sql("""
WITH raw_results as (
    SELECT 
        posexplode(entry),
        current_date as time
    FROM
        df_results
)
SELECT
    col.type as Type,
    time,
    pos as arg_id
FROM
    raw_results
""")

results_table.show()

结果

+        -+     +   +
|             Type|      time|arg_id|
+        -+     +   +
|   check_datatype|2021-03-31|     0|
|check_emptystring|2021-03-31|     1|
|       check_null|2021-03-31|     2|
+        -+     +   +

参数表生成

args_table = sparkSession.sql("""
WITH raw_results as (
    SELECT 
        posexplode(entry)
    FROM
        df_results
),
raw_arguments AS (
    SELECT
        explode(col.kwargs),
        pos as args_id
    FROM
        raw_results
),
raw_arguments_before_array_check AS (
SELECT
    args_id,
    key as bac_key,
    value as bac_value
    
FROM
    raw_arguments
),
raw_arguments_after_array_check AS (
SELECT
   args_id,
   bac_key,
   bac_value,
   posexplode(split(regexp_replace(bac_value,"[\\\[\\\]]",""),","))
FROM
   raw_arguments_before_array_check
)
SELECT
    args_id,
    bac_key as key,
    col as value,
    CASE
        WHEN bac_value LIKE '[%' THEN pos
        ELSE NULL
    END as list_index,
    abs(hash(args_id, bac_key,col,pos)) as hashcode
FROM
    raw_arguments_after_array_check
""")

args_table.show()

结果

+   -+     -+   +     +     +
|args_id|        key| value|list_index|  hashcode|
+   -+     -+   +     +     +
|      0|     d_type|string|      null| 216841494|
|      0|column_name|   vin|      null| 502458545|
|      0|      table|  cars|      null|1469121505|
|      1|column_name|   vin|      null| 604007568|
|      1|      table|  cars|      null| 784654488|
|      2|    columns|   vin|         0|1503105124|
|      2|    columns| index|         1| 454389776|
|      2|      table|  cars|      null| 858757332|
+   -+     -+   +     +     +

解决方案2:使用UDF

您还可以使用已经实现的python逻辑定义用户定义的函数,并将其应用于spark

设置

我们将在这里定义函数来创建结果和参数表。我已选择创建生成器类型函数,但这是可选的

result_entry =[
    {
        "type":"check_datatype",
        "kwargs":{
            "table":"cars","column_name":"vin","d_type":"string"
            }
    },
    {
        "type":"check_emptystring",
        "kwargs":{
            "table":"cars","column_name":"vin"
            }
    },
    {
        "type":"check_null",
        "kwargs":{
            "table":"cars","columns":["vin","index"]
            }
    }
]

import json
result_entry_str = json.dumps(result_entry)
result_entry_str

def extract_results_table(entry,current_date=None):
    if current_date is None:
        from datetime import date
        current_date = str(date.today())
    if type(entry)==str:
        import json
        entry = json.loads(entry)

    for arg_id,arg in enumerate(entry):
        yield {
            "Type":arg["type"],
            "time":current_date,
            "args_id":arg_id
        }

def extract_arguments_table(entry):
    if type(entry)==str:
        import json
        entry = json.loads(entry)

    for arg_id,arg in enumerate(entry):
        if "kwargs" in arg:
            for arg_entry in arg["kwargs"]:
                orig_key,orig_value = arg_entry, arg["kwargs"][arg_entry]
                if type(orig_value)==list:
                    for list_index,value in enumerate(orig_value):
                        yield {
                            "args_id":arg_id,
                            "key":orig_key,
                            "value":value,
                            "list_index":list_index,
                            "hash_code": hash((arg_id,orig_key,value,list_index))
                        }
                else:
                    yield {
                            "args_id":arg_id,
                            "key":orig_key,
                            "value":orig_value,
                            "list_index":None,
                            "hash_code": hash((arg_id,orig_key,orig_value,"null"))
                        }

Pyspark设置

from pyspark.sql.functions import udf,col,explode
from pyspark.sql.types import StructType,StructField,IntegerType,StringType, ArrayType

results_table_schema = ArrayType(StructType([
    StructField("Type",StringType(),True),
    StructField("time",StringType(),True),
    StructField("args_id",IntegerType(),True)
]),True)

arguments_table_schema = ArrayType(StructType([
    StructField("args_id",IntegerType(),True),
    StructField("key",StringType(),True),
    StructField("value",StringType(),True),
    StructField("list_index",IntegerType(),True),
    StructField("hash",StringType(),True)
]),True)

extract_results_table_udf = udf(lambda entry,current_date=None : [*extract_results_table(entry,current_date)],results_table_schema)
extract_arguments_table_udf = udf(lambda entry: [*extract_arguments_table(entry)],arguments_table_schema)

# this is useful if you intend to use your functions in spark-sql
sparkSession.udf.register('extract_results_table',extract_results_table_udf)
sparkSession.udf.register('extract_arguments_table',extract_arguments_table_udf)

火花数据帧

df_results_1 = sparkSession.createDataFrame([Row(entry=result_entry_str)],schema="entry string")
df_results_1.createOrReplaceTempView("df_results_1")
df_results_1.show()

提取结果表

# Using Spark SQL
sparkSession.sql("""
WITH results_table AS (
    select explode(extract_results_table(entry)) as entry FROM df_results_1
)
SELECT entry.* from results_table
""").show()
# Just python
df_results_1.select(
   explode(extract_results_table_udf(df_results_1.entry)).alias("entry")
).selectExpr("entry.*").show()

输出

+        -+     +   -+
|             Type|      time|args_id|
+        -+     +   -+
|   check_datatype|2021-03-31|      0|
|check_emptystring|2021-03-31|      1|
|       check_null|2021-03-31|      2|
+        -+     +   -+

+        -+     +   -+
|             Type|      time|args_id|
+        -+     +   -+
|   check_datatype|2021-03-31|      0|
|check_emptystring|2021-03-31|      1|
|       check_null|2021-03-31|      2|
+        -+     +   -+

提取结果表

# Using spark sql
sparkSession.sql("""
WITH arguments_table AS (
    select explode(extract_arguments_table(entry)) as entry FROM df_results_1
)
SELECT entry.* from arguments_table
""").show()
# Just python
df_results_1.select(
   explode(extract_arguments_table_udf(df_results_1.entry)).alias("entry")
).selectExpr("entry.*").show()

输出

+   -+     -+   +     +  +
|args_id|        key| value|list_index|hash|
+   -+     -+   +     +  +
|      0|      table|  cars|      null|null|
|      0|column_name|   vin|      null|null|
|      0|     d_type|string|      null|null|
|      1|      table|  cars|      null|null|
|      1|column_name|   vin|      null|null|
|      2|      table|  cars|      null|null|
|      2|    columns|   vin|         0|null|
|      2|    columns| index|         1|null|
+   -+     -+   +     +  +

+   -+     -+   +     +  +
|args_id|        key| value|list_index|hash|
+   -+     -+   +     +  +
|      0|      table|  cars|      null|null|
|      0|column_name|   vin|      null|null|
|      0|     d_type|string|      null|null|
|      1|      table|  cars|      null|null|
|      1|column_name|   vin|      null|null|
|      2|      table|  cars|      null|null|
|      2|    columns|   vin|         0|null|
|      2|    columns| index|         1|null|
+   -+     -+   +     +  +

参考文献

相关问题 更多 >

    热门问题