我有下面的字典清单
结果=
[
{
"type:"check_datatype",
"kwargs":{
"table":"cars","column_name":"vin","d_type":"string"
}
},
{
"type":"check_emptystring",
"kwargs":{
"table":"cars","column_name":"vin"
}
},
{
"type:"check_null",
"kwargs":{
"table":"cars","columns":["vin","index"]
}
}
]
我想用下面的模式创建两个不同的pyspark数据帧-
当我们有唯一的一对(类型,kwargs)时,结果表中的args_id列将是相同的。这个JSON必须每天运行,因此如果它再次发现相同的一对(type,kwargs),它应该给出相同的args_id值
到现在为止,我已经写了这段代码-
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
check_type_results = [[elt['type']] for elt in results]
checkColumns = ['type']
spark = SparkSession.builder.getOrCreate()
checkResultsDF = spark.createDataFrame(data=check_type_results, schema=checkColumns)
checkResultsDF = checkResultsDF.withColumn("time", F.current_timestamp())
checkResultsDF = checkResultsDF.withColumn("args_id", F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
checkResultsDF.printSchema()
现在,在我的代码中,我总是以递增的顺序获得args_id,这对于第一次运行是正确的,但是如果我在第二天再次运行json,或者可能是在同一天,并且json文件中有一对(type,kwargs)已经出现,所以我应该为这对使用相同的args_id
若某对(类型,kwargs)在Arguments表中并没有条目,那个么只有我将插入Arguments表,但若该对(类型,kwargs)已经存在于Arguments表中,那个么不应该在那个里进行插入
一旦这两个数据帧被正确填充,那么我想将它们加载到单独的增量表中
参数表中的Hashcode列是每个“kwargs”的唯一标识符
问题
您的模式有点不完整。更详细的模式将允许您利用更多spark功能。请参阅下面使用
spark-sql
和pyspark
的解决方案。与需要有序分区的窗口函数不同,您可以利用一些表生成数组函数,例如explode
和posexplode
,这些函数在spark-sql
中可用。由于它涉及到写入delta表,您可能会看到示例here解决方案1:使用Spark SQL
设置
模式定义
示例记录是一个结构/对象数组,其中
kwargs
是一个带有可选键的Maptype
。注意。True
表示可选,当缺少键或具有不同格式的条目时,应提供帮助可复制示例
结果
结果表生成
我已经使用
current_date
捕获了当前日期,但是您可以根据管道更改此日期结果
参数表生成
结果
解决方案2:使用UDF
您还可以使用已经实现的python逻辑定义用户定义的函数,并将其应用于spark
设置
我们将在这里定义函数来创建结果和参数表。我已选择创建生成器类型函数,但这是可选的
Pyspark设置
火花数据帧
提取结果表
输出
提取结果表
输出
参考文献
相关问题 更多 >
编程相关推荐