如何在Pyspark 2.2.0中计算不包括周末的日期之间的差异

2024-04-25 13:25:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我有下面的pyspark df,它可以由代码重新创建

df = spark.createDataFrame([(1, "John Doe", "2020-11-30"),(2, "John Doe", "2020-11-27"),(3, "John Doe", "2020-11-29")],
                            ("id", "name", "date")) 

   +---+--------+----------+
| id|    name|      date|
+---+--------+----------+
|  1|John Doe|2020-11-30|
|  2|John Doe|2020-11-27|
|  3|John Doe|2020-11-29|
+---+--------+----------+

我希望创建一个udf来计算两行日期(使用滞后函数)之间的差异,不包括周末,因为pyspark 2.2.0没有内置函数。2020年11月30日和2020年11月30日之间的差异;2020-11-27应该给出1,因为它们是星期一&;星期五

我试图在calculate difference between two dates excluding weekends in python的帮助下创建以下内容:

from pyspark.sql.functions import udf
import numpy as np
workdaUDF = udf(lambda z: workdays(z),IntegerType())
def workdays():
date1 = df.select(F.col('date')).collect()[1][0]
date2 = df.select(F.col('date')).collect()[0][0]
date_diff = np.busday_count(date1,date2)
return date_diff

df.withColumn("date_dif",workdaysUDF(F.col("date"))).show(truncate=False)

但是我得到了下面的错误

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

任何关于如何在数据帧的每一行上实现这一点的帮助都将非常有用

PS:My date1和date2变量需要是动态的,这取决于函数应用到的日期的值。此外,由于数据帧的大小,我无法使用pandas,因为我找到了多种解决方案

先谢谢你


Tags: 函数nameinfromiddfdatecol
1条回答
网友
1楼 · 发布于 2024-04-25 13:25:52

您不能在UDF中调用collect。只能将列传递给UDF,因此应该传递日期列和lag日期列,如下所示:

import numpy as np
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

df = spark.createDataFrame([
    (1, "John Doe", "2020-11-30"),
    (2, "John Doe", "2020-11-27"),
    (3, "John Doe", "2020-11-29")],
    ("id", "name", "date")
) 

workdaysUDF = F.udf(lambda date1, date2: int(np.busday_count(date2, date1)) if (date1 is not None and date2 is not None) else None, IntegerType())
df = df.withColumn("date_dif", workdaysUDF(F.col('date'), F.lag(F.col('date')).over(Window.partitionBy('name').orderBy('id'))))
df.show()

+ -+    +     +    +
| id|    name|      date|date_dif|
+ -+    +     +    +
|  1|John Doe|2020-11-30|    null|
|  2|John Doe|2020-11-27|      -1|
|  3|John Doe|2020-11-29|       1|
+ -+    +     +    +

相关问题 更多 >