我有下面的pyspark df,它可以由代码重新创建
df = spark.createDataFrame([(1, "John Doe", "2020-11-30"),(2, "John Doe", "2020-11-27"),(3, "John Doe", "2020-11-29")],
("id", "name", "date"))
+---+--------+----------+
| id| name| date|
+---+--------+----------+
| 1|John Doe|2020-11-30|
| 2|John Doe|2020-11-27|
| 3|John Doe|2020-11-29|
+---+--------+----------+
我希望创建一个udf来计算两行日期(使用滞后函数)之间的差异,不包括周末,因为pyspark 2.2.0没有内置函数。2020年11月30日和2020年11月30日之间的差异;2020-11-27应该给出1,因为它们是星期一&;星期五
我试图在calculate difference between two dates excluding weekends in python的帮助下创建以下内容:
from pyspark.sql.functions import udf
import numpy as np
workdaUDF = udf(lambda z: workdays(z),IntegerType())
def workdays():
date1 = df.select(F.col('date')).collect()[1][0]
date2 = df.select(F.col('date')).collect()[0][0]
date_diff = np.busday_count(date1,date2)
return date_diff
df.withColumn("date_dif",workdaysUDF(F.col("date"))).show(truncate=False)
但是我得到了下面的错误
PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
任何关于如何在数据帧的每一行上实现这一点的帮助都将非常有用
PS:My date1和date2变量需要是动态的,这取决于函数应用到的日期的值。此外,由于数据帧的大小,我无法使用pandas,因为我找到了多种解决方案
先谢谢你
您不能在UDF中调用
collect
。只能将列传递给UDF,因此应该传递日期列和lag
日期列,如下所示:相关问题 更多 >
编程相关推荐