我在googledataproc集群上使用Jupyter笔记本对纽约的Uber pickup数据进行了一些分析,但是在尝试使用UDF将lat/lon坐标映射到邻里的名字时遇到了一些问题。我的PySpark dataframe具有以下架构:
root
|-- dt: timestamp (nullable = true)
|-- Lat: float (nullable = true)
|-- Lon: float (nullable = true)
|-- Base: string (nullable = true)
看起来像这样:
^{pr2}$我要做的是使用Lat
和Lon
坐标,将它们与纽约市邻居的geojson shapefile进行比较,然后获得特定坐标所属的邻域,这样我就可以将该信息添加为列。最后我想要的是:
+-------------------+-------+--------+------+----------------+
| dt| Lat| Lon| Base| Loc|
+-------------------+-------+--------+------+----------------+
|2014-04-01 00:11:00| 40.769|-73.9549|B02512| Flushing|
|2014-04-01 00:17:00|40.7267|-74.0345|B02512| Upper East Side|
|2014-04-01 00:21:00|40.7316|-73.9873|B02512| Lower East Side|
|2014-04-01 00:28:00|40.7588|-73.9776|B02512| Bayside|
|2014-04-01 00:33:00|40.7594|-73.9722|B02512| Williamsburg|
+-------------------+-------+--------+------+----------------+
我定义了以下函数:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def add_neighborhood(lon, lat):
for i in geofile['features']:
p = path.Path(i['geometry']['coordinates'][0])
if p.contains_point((lon, lat)):
return i['properties']['neighborhood']
add_neighborhood_udf = udf(add_neighborhood, StringType())
但当我真的要用它的时候:
df2 = df2.withColumn('Loc', add_neighborhood_udf(df['Lon'], df['Lat']))
df2.show()
我得到以下错误:
Py4JJavaError: An error occurred while calling o227.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 times, most recent failure: Lost task 0.3 in stage 21.0 (TID 188, spark-cluster-w-1.c.airy-environs-194607.internal, executor 6): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/pyspark/worker.py", line 164, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
File "/usr/lib/spark/python/pyspark/worker.py", line 93, in read_udfs
arg_offsets, udf = read_single_udf(pickleSer, infile)
File "/usr/lib/spark/python/pyspark/worker.py", line 79, in read_single_udf
f, return_type = read_command(pickleSer, infile)
到目前为止,我能诊断出的是:
[Ljava.lang.Objec...]
。在[Ljava.lang.Objec...]
对象或者PySpark在UDF中访问matplotlib的能力。在有什么关于我能解决这个问题的建议吗?谢谢。在
一步一个脚印,我发现当我调用matplotlib的路径从一组坐标绘制多边形时会发生错误。在
p = path.Path(i['geometry']['coordinates'][0])
当我甚至没有返回UDF函数中的值p
时,为什么这一行会单独引发错误?在
目前没有回答
相关问题 更多 >
编程相关推荐