尝试使用自定义UDF时,无法通过“SparkException:Job aborted”错误

2024-04-26 13:06:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我在googledataproc集群上使用Jupyter笔记本对纽约的Uber pickup数据进行了一些分析,但是在尝试使用UDF将lat/lon坐标映射到邻里的名字时遇到了一些问题。我的PySpark dataframe具有以下架构:

root
 |-- dt: timestamp (nullable = true)
 |-- Lat: float (nullable = true)
 |-- Lon: float (nullable = true)
 |-- Base: string (nullable = true)

看起来像这样:

^{pr2}$

我要做的是使用LatLon坐标,将它们与纽约市邻居的geojson shapefile进行比较,然后获得特定坐标所属的邻域,这样我就可以将该信息添加为列。最后我想要的是:

+-------------------+-------+--------+------+----------------+
|                 dt|    Lat|     Lon|  Base|             Loc|
+-------------------+-------+--------+------+----------------+
|2014-04-01 00:11:00| 40.769|-73.9549|B02512|        Flushing|
|2014-04-01 00:17:00|40.7267|-74.0345|B02512| Upper East Side|
|2014-04-01 00:21:00|40.7316|-73.9873|B02512| Lower East Side|
|2014-04-01 00:28:00|40.7588|-73.9776|B02512|         Bayside|
|2014-04-01 00:33:00|40.7594|-73.9722|B02512|    Williamsburg|
+-------------------+-------+--------+------+----------------+

我定义了以下函数:

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def add_neighborhood(lon, lat):
    for i in geofile['features']:
        p = path.Path(i['geometry']['coordinates'][0])
        if p.contains_point((lon, lat)):
            return i['properties']['neighborhood']

add_neighborhood_udf = udf(add_neighborhood, StringType())

但当我真的要用它的时候:

df2 = df2.withColumn('Loc', add_neighborhood_udf(df['Lon'], df['Lat']))
df2.show()

我得到以下错误:

Py4JJavaError: An error occurred while calling o227.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 times, most recent failure: Lost task 0.3 in stage 21.0 (TID 188, spark-cluster-w-1.c.airy-environs-194607.internal, executor 6): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 164, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
  File "/usr/lib/spark/python/pyspark/worker.py", line 93, in read_udfs
    arg_offsets, udf = read_single_udf(pickleSer, infile)
  File "/usr/lib/spark/python/pyspark/worker.py", line 79, in read_single_udf
    f, return_type = read_command(pickleSer, infile)

到目前为止,我能诊断出的是:

  1. 我可以得到一个简单的自定义自定义自定义项,它只返回作为新列的经度,或经度+1或类似的简单数学。在
  2. 我也可以在熊猫身上解决这个问题。但我有数亿行,所以理想情况下,我可以使用PySpark来完成这个任务。在
  3. 当我尝试在数组或元组中组合lon/lat时,我得到的结果是返回[Ljava.lang.Objec...]。在
  4. 所以我认为问题在于[Ljava.lang.Objec...]对象或者PySpark在UDF中访问matplotlib的能力。在

有什么关于我能解决这个问题的建议吗?谢谢。在

编辑:

一步一个脚印,我发现当我调用matplotlib的路径从一组坐标绘制多边形时会发生错误。在

p = path.Path(i['geometry']['coordinates'][0])

当我甚至没有返回UDF函数中的值p时,为什么这一行会单独引发错误?在


Tags: inaddtruereadsparkpysparklonlat