如何在pyspark数据框中使用窗口函数

2 投票
2 回答
50 浏览
提问于 2025-04-13 15:57

我有一个pyspark的数据框,内容如下:

Mail            sno     mail_date       date1       present
abc@abc.com     790     2024-01-01      2024-02-06  yes
abc@abc.com     790     2023-12-23      2023-01-01  
nis@abc.com     101     2022-02-23                  
nis@abc.com     101     2021-01-20      2022-07-09  yes

在最终的数据框中,我需要每个的记录,包含所有最大日期的值,以及对应的最大日期在mail_date这一列中的值,针对present这一列。

所以最终的数据框应该是这样的:

Mail            sno     mail_date       date1       present
abc@abc.com     790     2024-01-01      2024-02-06  yes
nis@abc.com     101     2022-02-23      2022-07-09      

我有以下代码:

windowSpec=Window.partitionBy('Mail','sno')

df= df.withColumn('max_mail_date', F.max('mail_date').over(windowSpec))\
       .withColumn('max_date1', F.max('date1').over(windowSpec))
       
df1 = df.withColumn('mail_date', F.when(F.col('mail_date').isNotNull(), F.col('max_mail_date')).otherwise(F.col('mail_date')))\
        .drop('max_mail_date').dropDuplicates()

但是,我在present这一列中没有得到预期的值。

请给我一些建议,看看有什么可以改进的地方。

2 个回答

1

你可以创建一个窗口,这个窗口根据 'Mail', 'sno' 来划分,并且按照 mail_date 的顺序排列。在这个窗口里,你可以创建一个排名列,然后找出排名最高的 mail_date 以及与之对应的 present 列。接着,你可以通过对原始数据进行分组,按照 'Mail', 'sno' 来获取 max date1,然后把这两个结果在 'Mail', 'sno' 上进行合并。也许有更聪明的方法可以做到这一点,而不需要进行任何合并,但我不太确定怎么做。

window_spec = Window.partitionBy('Mail', 'sno').orderBy(F.desc('mail_date'))

df_with_max_date1 = df.groupby(
    'Mail','sno'
).agg(
    F.max('date1').alias('max_date1'),
)

df.withColumn(
    'rank', F.row_number().over(window_spec)
).where(
    F.col('rank') == 1
).select(
    'Mail','sno','mail_date','present'
).join(
    df_with_max_date1,
    on=['Mail','sno'],
    how='inner'
).select(
    'Mail',
    'sno',
    'mail_date',
    'max_date1',
    'present'
)

这将得到以下结果:

+-----------+---+----------+----------+-------+
|       Mail|sno| mail_date| max_date1|present|
+-----------+---+----------+----------+-------+
|abc@abc.com|790|2024-01-01|2024-02-06|    yes|
|nis@abc.com|101|2022-02-23|2022-07-09|       |
+-----------+---+----------+----------+-------+
1

虽然@DerekO的回答看起来没问题,但我想换个角度来讲这个问题。

我更喜欢使用 groupBy().agg() 的方法,结合 maxstruct。这种方法在处理大数据集时,能更有效地减少数据量。

那么,考虑到你提供的数据:

df_result = df.withColumn(
    "mail_date_struct", 
    F.struct(F.col("mail_date").alias("max_mail_date"), "present")
).groupBy("Mail", "sno").agg(
    F.max("mail_date_struct").alias("max_mail_date_struct"),
    F.max("date1").alias("max_date1")
).select(
    "Mail", 
    "sno",
    "max_mail_date_struct.max_mail_date",
    "max_mail_date_struct.present",
    "max_date1"
)

df_result.show()
# +-----------+---+-------------+-------+----------+
# |       Mail|sno|max_mail_date|present| max_date1|
# +-----------+---+-------------+-------+----------+
# |abc@abc.com|790|   2024-01-01|    yes|2024-02-06|
# |nis@abc.com|101|   2022-02-23|   NULL|2022-07-09|
# +-----------+---+-------------+-------+----------+

注意事项:

我建议的方法和窗口函数的方法之间的选择,取决于你的具体需求、数据集的特点以及性能的考虑。

对于大数据集,如果你想通过聚合来减少数据量,这种方法可能更有效。不过,如果你需要在计算分区指标时保持原始数据集的结构,窗口函数可能更合适,但可能会消耗更多的计算资源。

总是建议在你的一部分数据上测试这两种方法,以了解在你特定环境下的性能影响。

撰写回答