根据时间戳识别重叠记录并删除旧重叠项的pyspark
这是一个关于pyspark的重叠时间段问题:
示例数据
data = [
(1, "2024-01-28T05:00:00Z", "2024-01-28T06:00:00Z", "1/24/24"),
(1, "2024-01-28T05:30:00Z", "2024-01-28T07:00:00Z", "1/25/24"),
(1, "2024-01-28T06:00:00Z", "2024-01-28T09:00:00Z", "1/24/24"),
(1, "2024-01-28T07:00:00Z", "2024-01-28T10:30:00Z", "1/25/24"),
(3, "2024-01-28T12:00:00Z", "2024-01-28T13:00:00Z", "1/26/24"),
]
columns = ["station_id", "start_time", "end_time", "partition_date"]
我想根据开始时间和结束时间字段,找出同一个站点(station_id)下的重叠记录。一旦找到了重叠的记录,我只想保留那些最新的分区日期(partition_date)的行,删除那些分区日期较旧的重叠行。
我希望得到的结果是:
output = [
(1, "2024-01-28T05:30:00Z", "2024-01-28T07:00:00Z", "1/25/24"),
(1, "2024-01-28T07:00:00Z", "2024-01-28T10:30:00Z", "1/25/24"),
(3, "2024-01-28T12:00:00Z", "2024-01-28T13:00:00Z", "1/26/24"),
]
我尝试了几种方法来实现这个目标,包括连接(joins)和窗口函数(windowing),但无论哪种方法,我都没有得到想要的结果。我的目标是:识别重叠的行,并只保留最新的重叠行,同时删除其他的重叠行,还要保留所有不重叠的行。我的目标是对于某个站点(station_id),在特定的时间段内只记录一次,比如5:00到6:00之间,某个站点应该只有一条记录。
我尝试过的一个解决方案:
dl_ota_airings_df_dup = dl_ota_airings_df_3.selectExpr("station_id as station_id2", "start_time as start_time2", "end_time as end_time2", "content_id as content_id2", "partition_date as partition_date2")
join_condition = ((dl_ota_airings_df_3["station_id"] == dl_ota_airings_df_dup["station_id2"]) &
((dl_ota_airings_df_3["start_time"] >= dl_ota_airings_df_dup["start_time2"]) & (dl_ota_airings_df_3["start_time"] < dl_ota_airings_df_dup["end_time2"]))
|
((dl_ota_airings_df_3["end_time"] <= dl_ota_airings_df_dup["end_time2"]) & (dl_ota_airings_df_3["end_time"] > dl_ota_airings_df_dup["start_time2"])))
# &
# ((dl_ota_airings_df_3["start_time"] != dl_ota_airings_df_dup["start_time2"]) & (dl_ota_airings_df_3["end_time"] != dl_ota_airings_df_dup["end_time2"]) & (dl_ota_airings_df_3["partition_date"] != dl_ota_airings_df_dup["partition_date2"])))
df_overlapping = dl_ota_airings_df_3.join(dl_ota_airings_df_dup, join_condition, "left")
dl_ota_airings_df_4 = df_overlapping.filter("station_id2 is null or (partition_date > partition_date2)").drop("station_id2", "start_time2", "end_time2", "content_id2", "partition_date2").dropDuplicates()
我总是会遇到一些边缘情况,这些情况没有被我的逻辑捕捉到。因为对于任何一个站点的重叠记录,我只想保留一条最新的记录,而删除其他的记录。请给我一些建议,或者指引我找到正确的方向。
1 个回答
0
你可以试试下面的代码来实现这个功能
windowSpec = Window.partitionBy("station_id").orderBy("start_time")
result_df = df.withColumn("prev_end_time", F.lag("end_time").over(windowSpec)) \
.withColumn("overlap", F.when(F.col("start_time") < F.col("prev_end_time"), True).otherwise(False)) \
.withColumn("max_partition_date", F.max("partition_date").over(Window.partitionBy("station_id"))) \
.filter((~F.col("overlap")) | (F.col("partition_date") == F.col("max_partition_date"))) \
.select("station_id", "start_time", "end_time", "partition_date")