org.apache.spark.sql.AnalysisException:无法解析UDF(df[“columnName”])

2024-05-29 04:35:49 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要根据一些条件动态生成Select表达式(使用coalesce function和UDF checkNullUdf)。为此,我编写了如下代码:

column_expr=''
for row in PRIORITIZATION.rdd.collect():
    x=row.__fields__
    colName=row.Element
    i=0
    lst=['']
    for col in row:
        if (col is not None and i != 0 and i != 1) :
            dfColName="checkNullUdf("+x[i]+"[\""+colName+"\"])"
            lst.insert(int(col)-1,dfColName)
        i+=1
    lst.remove('')
    print(lst)
    lst=",".join(lst)
    column_expr=column_expr+"coalesce("+lst+"),"
column_expr=column_expr[:-1]
print("Final String is: " +column_expr) 

这将按预期提供以下输出

Final String is: coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))

但是,在如下所示将此字符串传递给select表达式时,我遇到了一个错误:

RESULT_REC = GOLDEN_RECORD.join(BITPULSE, GOLDEN_RECORD.BitPulse_rec_Id==BITPULSE.UUID, "left_outer").join(SAP, GOLDEN_RECORD.SAP_rec_Id==SAP.UUID, "left_outer").join(WEBBIT, GOLDEN_RECORD.MDM_Well_Id==WEBBIT.UUID, "left_outer").join(WELLDB, GOLDEN_RECORD.WellDB_rec_Id==WELLDB.UUID, "left_outer").select(column_expr)

错误如下:

Py4JJavaError                             Traceback (most recent call last)
C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o75.select.
: org.apache.spark.sql.AnalysisException: cannot resolve '`coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))`' given input columns: [State, Country, UUID, County, City, WellDB_rec_Id, Spud_Date, UUID, Well_Name, Water_Depth, State, Spud_Date, UUID, SAP_rec_Id, Country, Plug_Date, County, State, UUID, Spud_Date, Field_Name, BitPulse_rec_Id, Operator, Well_Name, Water_Depth, Country, MDM_Well_Id, UUID, Water_Depth, Field_Name, Well_Name, Long];;
'Project ['coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))]
+- Join LeftOuter, (WellDB_rec_Id#14 = UUID#124)
   :- Join LeftOuter, (MDM_Well_Id#11 = UUID#108)
   :  :- Join LeftOuter, (SAP_rec_Id#12 = UUID#88)
   :  :  :- Join LeftOuter, (BitPulse_rec_Id#13 = UUID#40)
   :  :  :  :- Relation[UUID#10,MDM_Well_Id#11,SAP_rec_Id#12,BitPulse_rec_Id#13,WellDB_rec_Id#14,Well_Name#15,Country#16,Operator#17,Field_Name#18,Long#19] csv
   :  :  :  +- Relation[UUID#40,Spud_Date#41,Plug_Date#42,Water_Depth#43,Field_Name#44,State#45,County#46] csv
   :  :  +- Relation[UUID#88,Well_Name#89,Country#90,City#91,State#92] csv
   :  +- Relation[UUID#108,Spud_Date#109,Water_Depth#110] csv
   +- Relation[UUID#124,Well_Name#125,State#126,County#127,Country#128,Spud_Date#129,Water_Depth#130] csv
--------------------------------------------------------------------
AnalysisException: 'cannot resolve \'`coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))`\' given input columns: [State, Country, UUID, County, City, WellDB_rec_Id, Spud_Date, UUID, Well_Name, Water_Depth, State, Spud_Date, UUID, SAP_rec_Id, Country, Plug_Date, County, State, UUID, Spud_Date, Field_Name, BitPulse_rec_Id, Operator, Well_Name, Water_Depth, Country, MDM_Well_Id, UUID, Water_Depth, Field_Name, Well_Name, Long];;\n\'Project [\'coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))]\n+- Join LeftOuter, (WellDB_rec_Id#14 = UUID#124)\n   :- Join LeftOuter, (MDM_Well_Id#11 = UUID#108)\n   :  :- Join LeftOuter, (SAP_rec_Id#12 = UUID#88)\n   :  :  :- Join LeftOuter, (BitPulse_rec_Id#13 = UUID#40)\n   :  :  :  :- Relation[UUID#10,MDM_Well_Id#11,SAP_rec_Id#12,BitPulse_rec_Id#13,WellDB_rec_Id#14,Well_Name#15,Country#16,Operator#17,Field_Name#18,Long#19] csv\n   :  :  :  +- Relation[UUID#40,Spud_Date#41,Plug_Date#42,Water_Depth#43,Field_Name#44,State#45,County#46] csv\n   :  :  +- Relation[UUID#88,Well_Name#89,Country#90,City#91,State#92] csv\n   :  +- Relation[UUID#108,Spud_Date#109,Water_Depth#110] csv\n   +- Relation[UUID#124,Well_Name#125,State#126,County#127,Country#128,Spud_Date#129,Water_Depth#130] csv\n'
20/04/07 00:53:33 INFO SparkContext: Invoking stop() from shutdown hook

但是,如果我在select表达式中复制粘贴最后的字符串输出,它工作正常,并且我得到了预期的输出

不确定为什么在传递列_expr时出现错误


Tags: nameiddateuuidrecordcountrystatesap
2条回答

可以使用spar.sql(" SQL expression as a String ")运行字符串SQL表达式

val employees = spark.createDataFrame(Seq(("E1",100.0,"a,b"), ("E2",200.0,"e,f"),(null,300.0,"c,d"))).toDF("employee","salary","clubs")

employees.createTempView("employees")
spark.sql("select coalesce(employee,salary) as emp_or_sal from employees").show()

结果-

+     +
|emp_or_sal|
+     +
|        E1|
|        E2|
|     300.0|
+     +

selectExpr代替select。如果查看该错误,您可以看到整个表达式周围都有反勾号,这意味着Spark正在尝试查找名称与整个字符串匹配的列selectExpr将对作为SQL表达式传入的每个参数求值,结果得到一个列,而select希望每个参数都是一个命名列

同样值得注意的是:

通过发布的代码很难跟踪您的意图,但是这里有一些可疑的事情。向我跳出来的最大的一个是checkNullUdf。 与使用内置函数相比,UDF的性能要差得多。您似乎也在使用collect来获取列名,除非它们的值有什么特殊的地方,否则这些列名是不必要的,因为您可以只使用DataFrame.columns

相关问题 更多 >

    热门问题