pyspark函数从plsql到pyspark代码的转换速度慢,因为循环正在扼杀性能

2024-04-26 09:45:44 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在为一个迁移项目将一个PLSQL函数转换为pyspark代码。 现有的场景是:informaticasq query有一个正在调用PLSQL函数的sql。示例:从d\u emp中选择empid、job、AssingmentStatus(empid、joiningDate)。 为了在pyspark代码中转换上述场景,我将上述查询作为数据帧并使用测向变换我试图调用函数“AssingmentStatus”。 “AssingmentStatus”函数我已经用pyspark代码重写了。 下面是我如何做到这一点的例子。 德昂普有700万条记录

 # ~ 7  million records insq_df
SQ_df = spark.sql("select empid, job,joiningDate from d_emp")

# Created empty DF first.
tmp_empty_df = spark.sql("select empid, joiningdate from d_emp limit 0 ")
tmp_empty_df  = tmp_empty_df .withColumn('Assignstatus', lit(0))

for row in sq_df.collect():
   i_empid = row['empid']
   i_joiningDate = row['joiningDate]

 # createed DF for one row
   row_df = spark.sql("select {0} as empid,{1} as 
            joiningDate".format(i_empid ,i_joiningDate ))
  #use transform to call function AssingmentStatus
   return_df = (row_df.transform(lambda row_df: AssingmentStatus(empid, 
        joiningDate)))
  # I will get 0 or 1 as return status from function
    t_assignedstatus = return_df .head()[0]
 #create a df for each row
    assignmentdf = test_df.withColumn("t_assignedstatus", 
      lit(t_assignedstatus))
   # append each df coming for each row to empty df
     tmp_empty_df  = tmp_empty_df.union(assignmentdf)
   #After for loop ends get final DF.
      tmp_emp_activity_fn_status.show()

AssingmentStatus函数有很大的逻辑和递归调用,这取决于条件,我已经将plsql cursor改为dataframe,并使用pyspark语法重新编码plsql函数(这是一个很大的代码,所以不粘贴在这里),但最后返回的是0或1。你知道吗

我的代码对少数记录(例如100条)运行良好,但如果我想对所有记录(约700万条)运行它,它将永远运行。你知道吗

我能在这里做些什么吗? 或者其他我可以写这个功能的方法? 我是新来的火花,所以任何帮助都非常感谢。你知道吗


Tags: 函数代码dfforsql记录tmpspark