获取None.org.apache.spark.sql.execution.python.UserDefinedPythonFunction

2024-04-25 18:14:07 发布

您现在位置:Python中文网/ 问答频道 /正文

我定义了一些简单的函数,如:

def median_func(xs):
    List_median=sorted(xs)
    if len(List_median)%2==0:
        result=(List_median[int(len(List_median)/2) - 1] + List_median[int(len(List_median)/2)])/2
    else:
        result=List_median[int(len(List_median)/2)]
    return result

## --------------------- ##
def max_func(xs):
    List_max=sorted(xs)
    return List_max[-1]

## --------------------- ##
def min_func(xs):
    List_min=sorted(xs)
    return List_min[0]

并将一些lambda定义为:

import pyspark.sql.functions as sf
median_udf = sf.udf(lambda xs: median_func(xs), DoubleType())
max_udf = sf.udf(lambda xs: max_func(xs), IntegerType())
min_udf = sf.udf(lambda xs: min_func(xs), DoubleType())

在Pypark中,我将这些用作:

data_frame = data_frame.withColumn("Rolling_median_lat", median_udf(column_latitude))\
                .withColumn("Rolling_median_lon", median_udf(column_longitude))\
                .withColumn("Rolling_max_deltatime", max_udf(column_deltatime))

当我在上面运行python2.7和pyspark2.2.0时,一切正常。但是,当我在Python3.6中尝试相同的代码时,我看到了以下问题:

Py4JError: An error occurred while calling None.org.apache.spark.sql.execution.python.UserDefinedPythonFunction. Trace:
py4j.Py4JException: Constructor org.apache.spark.sql.execution.python.UserDefinedPythonFunction([class java.lang.String, class org.apache.spark.api.python.PythonFunction, class org.apache.spark.sql.types.DoubleType$, class java.lang.Integer, class java.lang.Boolean]) does not exist
    at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
    at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
    at py4j.Gateway.invoke(Gateway.java:235)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

我不确定是什么问题。我尝试过一些尝试(不确定是否值得在这里提及),但没有任何效果。我做错什么了?你知道吗

编辑:

我定义的变量:

column_latitude

这将提供:

Column<b'array(latitude, Lag_latitude_1, Lead_latitude_1, Lag_latitude_2, Lead_latitude_2, Lag_latitude_3, Lead_latitude_3, Lag_latitude_4, Lead_latitude_4)'>

这些是简单的字符串数组。你知道吗

编辑:这是我的原稿:

data_frame.head(2)

这给了我:

[Row(id=1234, movementdatetime=datetime.datetime(2017, 9, 4, 13, 57, 16), latitude=38.477, longitude=13.256, deltaTime_sec=3459, Lag_latitude_1=38.4593, Lead_latitude_1=38.4872, Lag_longitude_1=13.4902, Lead_longitude_1=13.1767, Lag_deltatime_1=25531, Lead_deltatime_1=1212, Lag_latitude_2=38.3432, Lead_latitude_2=39.5649, Lag_longitude_2=15.1879, Lead_longitude_2=2.6392, Lag_deltatime_2=3280, Lead_deltatime_2=20623078, Lag_latitude_3=38.331, Lead_latitude_3=39.5649, Lag_longitude_3=15.3842, Lead_longitude_3=2.6392, Lag_deltatime_3=3588, Lead_deltatime_3=14580, Lag_latitude_4=38.324, Lead_latitude_4=39.5649, Lag_longitude_4=15.6001, Lead_longitude_4=2.6391, Lag_deltatime_4=0, Lead_deltatime_4=7199),
Row(id=2345, movementdatetime=datetime.datetime(2017, 9, 4, 14, 17, 28), latitude=38.4872, longitude=13.1767, deltaTime_sec=1212, Lag_latitude_1=38.477, Lead_latitude_1=39.5649, Lag_longitude_1=13.256, Lead_longitude_1=2.6392, Lag_deltatime_1=3459, Lead_deltatime_1=20623078, Lag_latitude_2=38.4593, Lead_latitude_2=39.5649, Lag_longitude_2=13.4902, Lead_longitude_2=2.6392, Lag_deltatime_2=25531, Lead_deltatime_2=14580, Lag_latitude_3=38.3432, Lead_latitude_3=39.5649, Lag_longitude_3=15.1879, Lead_longitude_3=2.6391, Lag_deltatime_3=3280, Lead_deltatime_3=7199, Lag_latitude_4=38.331, Lead_latitude_4=39.5649, Lag_longitude_4=15.3842, Lead_longitude_4=2.6391, Lag_deltatime_4=3588, Lead_deltatime_4=10803)]

基本上,我有多个列(九),其中我需要计算中位数。你知道吗


Tags: javaminmaxatlistlagmedianfunc