如何在pyspark数据框中使用窗口函数
我有一个pyspark的数据框,内容如下:
Mail sno mail_date date1 present
abc@abc.com 790 2024-01-01 2024-02-06 yes
abc@abc.com 790 2023-12-23 2023-01-01
nis@abc.com 101 2022-02-23
nis@abc.com 101 2021-01-20 2022-07-09 yes
在最终的数据框中,我需要每个present
这一列。
所以最终的数据框应该是这样的:
Mail sno mail_date date1 present
abc@abc.com 790 2024-01-01 2024-02-06 yes
nis@abc.com 101 2022-02-23 2022-07-09
我有以下代码:
windowSpec=Window.partitionBy('Mail','sno')
df= df.withColumn('max_mail_date', F.max('mail_date').over(windowSpec))\
.withColumn('max_date1', F.max('date1').over(windowSpec))
df1 = df.withColumn('mail_date', F.when(F.col('mail_date').isNotNull(), F.col('max_mail_date')).otherwise(F.col('mail_date')))\
.drop('max_mail_date').dropDuplicates()
但是,我在present这一列中没有得到预期的值。
请给我一些建议,看看有什么可以改进的地方。
2 个回答
1
你可以创建一个窗口,这个窗口根据 'Mail', 'sno'
来划分,并且按照 mail_date
的顺序排列。在这个窗口里,你可以创建一个排名列,然后找出排名最高的 mail_date
以及与之对应的 present
列。接着,你可以通过对原始数据进行分组,按照 'Mail', 'sno'
来获取 max date1
,然后把这两个结果在 'Mail', 'sno'
上进行合并。也许有更聪明的方法可以做到这一点,而不需要进行任何合并,但我不太确定怎么做。
window_spec = Window.partitionBy('Mail', 'sno').orderBy(F.desc('mail_date'))
df_with_max_date1 = df.groupby(
'Mail','sno'
).agg(
F.max('date1').alias('max_date1'),
)
df.withColumn(
'rank', F.row_number().over(window_spec)
).where(
F.col('rank') == 1
).select(
'Mail','sno','mail_date','present'
).join(
df_with_max_date1,
on=['Mail','sno'],
how='inner'
).select(
'Mail',
'sno',
'mail_date',
'max_date1',
'present'
)
这将得到以下结果:
+-----------+---+----------+----------+-------+
| Mail|sno| mail_date| max_date1|present|
+-----------+---+----------+----------+-------+
|abc@abc.com|790|2024-01-01|2024-02-06| yes|
|nis@abc.com|101|2022-02-23|2022-07-09| |
+-----------+---+----------+----------+-------+
1
虽然@DerekO的回答看起来没问题,但我想换个角度来讲这个问题。
我更喜欢使用 groupBy().agg()
的方法,结合 max
和 struct
。这种方法在处理大数据集时,能更有效地减少数据量。
那么,考虑到你提供的数据:
df_result = df.withColumn(
"mail_date_struct",
F.struct(F.col("mail_date").alias("max_mail_date"), "present")
).groupBy("Mail", "sno").agg(
F.max("mail_date_struct").alias("max_mail_date_struct"),
F.max("date1").alias("max_date1")
).select(
"Mail",
"sno",
"max_mail_date_struct.max_mail_date",
"max_mail_date_struct.present",
"max_date1"
)
df_result.show()
# +-----------+---+-------------+-------+----------+
# | Mail|sno|max_mail_date|present| max_date1|
# +-----------+---+-------------+-------+----------+
# |abc@abc.com|790| 2024-01-01| yes|2024-02-06|
# |nis@abc.com|101| 2022-02-23| NULL|2022-07-09|
# +-----------+---+-------------+-------+----------+
注意事项:
我建议的方法和窗口函数的方法之间的选择,取决于你的具体需求、数据集的特点以及性能的考虑。
对于大数据集,如果你想通过聚合来减少数据量,这种方法可能更有效。不过,如果你需要在计算分区指标时保持原始数据集的结构,窗口函数可能更合适,但可能会消耗更多的计算资源。
总是建议在你的一部分数据上测试这两种方法,以了解在你特定环境下的性能影响。