我正在为一个迁移项目将一个PLSQL函数转换为pyspark代码。 现有的场景是:informaticasq query有一个正在调用PLSQL函数的sql。示例:从d\u emp中选择empid、job、AssingmentStatus(empid、joiningDate)。 为了在pyspark代码中转换上述场景,我将上述查询作为数据帧并使用测向变换我试图调用函数“AssingmentStatus”。 “AssingmentStatus”函数我已经用pyspark代码重写了。 下面是我如何做到这一点的例子。 德昂普有700万条记录
# ~ 7 million records insq_df
SQ_df = spark.sql("select empid, job,joiningDate from d_emp")
# Created empty DF first.
tmp_empty_df = spark.sql("select empid, joiningdate from d_emp limit 0 ")
tmp_empty_df = tmp_empty_df .withColumn('Assignstatus', lit(0))
for row in sq_df.collect():
i_empid = row['empid']
i_joiningDate = row['joiningDate]
# createed DF for one row
row_df = spark.sql("select {0} as empid,{1} as
joiningDate".format(i_empid ,i_joiningDate ))
#use transform to call function AssingmentStatus
return_df = (row_df.transform(lambda row_df: AssingmentStatus(empid,
joiningDate)))
# I will get 0 or 1 as return status from function
t_assignedstatus = return_df .head()[0]
#create a df for each row
assignmentdf = test_df.withColumn("t_assignedstatus",
lit(t_assignedstatus))
# append each df coming for each row to empty df
tmp_empty_df = tmp_empty_df.union(assignmentdf)
#After for loop ends get final DF.
tmp_emp_activity_fn_status.show()
AssingmentStatus函数有很大的逻辑和递归调用,这取决于条件,我已经将plsql cursor改为dataframe,并使用pyspark语法重新编码plsql函数(这是一个很大的代码,所以不粘贴在这里),但最后返回的是0或1。你知道吗
我的代码对少数记录(例如100条)运行良好,但如果我想对所有记录(约700万条)运行它,它将永远运行。你知道吗
我能在这里做些什么吗? 或者其他我可以写这个功能的方法? 我是新来的火花,所以任何帮助都非常感谢。你知道吗
目前没有回答
相关问题 更多 >
编程相关推荐