根据条件在pyspark中获取窗口的开始和结束时间

0 投票
2 回答
54 浏览
提问于 2025-04-12 16:47

使用PySpark和Databricks。首先,从数据集中找到最早的更新时间(Minimum DateUpdated)。接着,检查在Comp_BB_Status等于1的情况下,从这个更新时间开始,看看需要多长时间才能把Comp_BB_Status改为0。我想找出Comp_BB_Status为1的时间段,并计算这个时间段的持续时间,单位是秒和分钟。

SampleData

期望的输出结果如下: expected_output

任何指导都非常感谢。我是PySpark的新手。

编辑

创建数据框的代码:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType

# Initialize SparkSession
spark_a = SparkSession.builder \
    .appName("Create DataFrame") \
    .getOrCreate()

# Define schema for the DataFrame
schema = StructType([
    StructField("CpoSku", StringType(), True),
    StructField("DateUpdated", TimestampType(), True),
    StructField("CPO_BB_Status", IntegerType(), True)
])

# Define data
data = [
    ("AAGN7013005", "2024-01-24T05:02:06.898+00:00", 0),
    ("AAGN7013005", "2024-01-24T05:07:05.090+00:00", 1),
    ("AAGN7013005", "2024-01-24T06:42:56.825+00:00", 1),
    ("AAGN7013005", "2024-01-24T06:48:01.647+00:00", 1),
    ("AAGN7013005", "2024-01-24T07:48:18.456+00:00", 1),
    ("AAGN7013005", "2024-01-24T09:30:22.534+00:00", 1),
    ("AAGN7013005", "2024-01-24T09:36:04.075+00:00", 1),
    ("AAGN7013005", "2024-01-24T10:39:04.796+00:00", 1),
    ("AAGN7013005", "2024-01-24T10:44:01.193+00:00", 1),
    ("AAGN7013005", "2024-01-24T17:00:06.217+00:00", 1),
    ("AAGN7013005", "2024-01-24T18:07:16.612+00:00", 1),
    ("AAGN7013005", "2024-01-24T18:13:04.639+00:00", 0),
    ("AAGN7013005", "2024-01-24T21:33:03.796+00:00", 0),
    ("AAGN7013005", "2024-01-24T21:38:28.834+00:00", 1),
    ("AAGN7013005", "2024-01-24T22:35:43.995+00:00", 1),
    ("AAGN7013005", "2024-01-24T22:40:45.930+00:00", 0),
    ("AAGN7022205", "2024-01-24T04:09:30.167+00:00", 0),
    ("AAGN7022205", "2024-01-24T04:14:56.294+00:00", 0),
    ("AAGN7022205", "2024-01-24T04:53:01.281+00:00", 0),
    ("AAGN7022205", "2024-01-24T05:03:27.103+00:00", 0),
    ("AAGN7022205", "2024-01-24T05:08:05.096+00:00" ,1),
    ("AAGN7022205", "2024-01-24T05:53:22.652+00:00", 1),
    ("AAGN7022205", "2024-01-24T06:04:59.031+00:00", 1),
    ("AAGN7022205", "2024-01-24T06:43:04.285+00:00", 1),
    ("AAGN7022205", "2024-01-24T06:43:34.285+00:01", 0)
]

# Create DataFrame
df_test = spark.createDataFrame(data, schema=schema)

# Show DataFrame schema and preview data
df_test.printSchema()
df_test.show()

# Stop SparkSession
spark_a.stop()

2 个回答

0

这里有一个可能的解决方案,步骤如下。

我们知道,当之前的状态是0而当前状态是1时,窗口就开始了。

同样,当当前状态是1而下一个状态是0时,窗口就结束了。

为了检查这些条件,我们可以创建两个额外的列,一个是滞后状态列,另一个是领先状态列。

通过对状态、滞后列和领先列应用这些条件,我们可以在一个单独的列中标记窗口的开始和结束。

接下来的步骤是创建窗口跨度标记,也就是用group_id来识别窗口的跨度。

一旦通过唯一的group_id识别出窗口跨度,我们只需要在公共列上进行分组,然后找到最小日期作为开始日期最大日期作为结束日期

下面是实现的代码:

from pyspark.sql import Window
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import pyspark.sql.types as T
from pyspark.sql.functions import to_timestamp, col

spark = SparkSession.builder.appName("extract start end dates").getOrCreate()

schema = T.StructType([
    T.StructField("CpoSku", T.StringType(), nullable=False),
    T.StructField("DateUpdated", T.StringType(), nullable=False),
    T.StructField("status", T.IntegerType(), nullable=False),
])

data = [
    ("AAGN7013005", "2024-01-24T05:02:06.898+00:00", 0),
    ("AAGN7013005", "2024-01-24T05:07:05.090+00:00", 1),
    ("AAGN7013005", "2024-01-24T06:42:56.825+00:00", 1),
    ("AAGN7013005", "2024-01-24T06:48:01.647+00:00", 1),
    ("AAGN7013005", "2024-01-24T07:48:18.456+00:00", 1),
    ("AAGN7013005", "2024-01-24T09:30:22.534+00:00", 1),
    ("AAGN7013005", "2024-01-24T09:36:04.075+00:00", 1),
    ("AAGN7013005", "2024-01-24T10:39:04.796+00:00", 1),
    ("AAGN7013005", "2024-01-24T10:44:01.193+00:00", 1),
    ("AAGN7013005", "2024-01-24T17:00:06.217+00:00", 1),
    ("AAGN7013005", "2024-01-24T18:07:16.612+00:00", 1),
    ("AAGN7013005", "2024-01-24T18:13:04.639+00:00", 0),
    ("AAGN7013005", "2024-01-24T21:33:03.796+00:00", 0),
    ("AAGN7013005", "2024-01-24T21:38:28.834+00:00", 1),
    ("AAGN7013005", "2024-01-24T22:35:43.995+00:00", 1),
    ("AAGN7013005", "2024-01-24T22:40:45.930+00:00", 0),
    ("AAGN7013005", "2024-01-25T01:03:50.742+00:00", 0),
    ("AAGN7013005", "2024-01-25T02:18:09.229+00:00", 0),
    ("AAGN7022205", "2024-01-24T04:09:30.167+00:00", 0),
    ("AAGN7022205", "2024-01-24T04:14:56.294+00:00", 0),
    ("AAGN7022205", "2024-01-24T04:53:01.281+00:00", 0),
    ("AAGN7022205", "2024-01-24T05:03:27.103+00:00", 0),
    ("AAGN7022205", "2024-01-24T05:08:05.096+00:00", 1),
    ("AAGN7022205", "2024-01-24T05:53:22.652+00:00", 1),
    ("AAGN7022205", "2024-01-24T06:04:59.031+00:00", 1),
    ("AAGN7022205", "2024-01-24T06:43:04.285+00:00", 1),
    ("AAGN7022205", "2024-01-24T06:43:34.285+00:00", 0),
]

df = spark.createDataFrame(data, schema=schema)

sample_df = df.withColumn("DateUpdated", to_timestamp(col("DateUpdated"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))

sample_df.show(n=200, truncate=False)

windowSpec = Window.partitionBy("CpoSku").orderBy("DateUpdated")


sample_df = sample_df.withColumn("lagged_status", F.lag(col("status"), 1).over(windowSpec))
sample_df = sample_df.withColumn("lead_status", F.lead(col("status"), 1).over(windowSpec))

sample_df = sample_df.withColumn("start_end_column", F.when(
                                                                        (
                                                                            (F.col("status") == F.lit(1))   &   (F.col("lagged_status") == F.lit(0))
                                                                        ), "start")
                                                                .when(
                                                                            ((F.col("status") == F.lit(1))   &   (F.col("lead_status") == F.lit(0)))
                                                                             , "end")
                                                                        .otherwise("nan"))

sample_df.show(n=200, truncate=False)


start_indicator = F.when(F.col("start_end_column") == F.lit("start"), 1).otherwise(0)
group_id = F.sum(start_indicator).over(windowSpec)

sample_df = sample_df.withColumn("group_id", group_id)
sample_df.show(n=200, truncate=False)

sample_df = sample_df.filter(col("status") == F.lit(1)).groupBy("CpoSku", "status", "group_id").agg(min("DateUpdated").alias("start_end"), max("DateUpdated").alias("end_date"))
sample_df.show(n=200, truncate=False)

输出结果:

+-----------+------+--------+-----------------------+-----------------------+
|CpoSku     |status|group_id|start_end              |end_date               |
+-----------+------+--------+-----------------------+-----------------------+
|AAGN7013005|1     |1       |2024-01-24 10:37:05.09 |2024-01-24 23:37:16.612|
|AAGN7013005|1     |2       |2024-01-25 03:08:28.834|2024-01-25 04:05:43.995|
|AAGN7022205|1     |1       |2024-01-24 10:38:05.096|2024-01-24 12:13:04.285|
+-----------+------+--------+-----------------------+-----------------------+

完整输出如下:

+-----------+-----------------------+------+
|CpoSku     |DateUpdated            |status|
+-----------+-----------------------+------+
|AAGN7013005|2024-01-24 10:32:06.898|0     |
|AAGN7013005|2024-01-24 10:37:05.09 |1     |
|AAGN7013005|2024-01-24 12:12:56.825|1     |
|AAGN7013005|2024-01-24 12:18:01.647|1     |
|AAGN7013005|2024-01-24 13:18:18.456|1     |
|AAGN7013005|2024-01-24 15:00:22.534|1     |
|AAGN7013005|2024-01-24 15:06:04.075|1     |
|AAGN7013005|2024-01-24 16:09:04.796|1     |
|AAGN7013005|2024-01-24 16:14:01.193|1     |
|AAGN7013005|2024-01-24 22:30:06.217|1     |
|AAGN7013005|2024-01-24 23:37:16.612|1     |
|AAGN7013005|2024-01-24 23:43:04.639|0     |
|AAGN7013005|2024-01-25 03:03:03.796|0     |
|AAGN7013005|2024-01-25 03:08:28.834|1     |
|AAGN7013005|2024-01-25 04:05:43.995|1     |
|AAGN7013005|2024-01-25 04:10:45.93 |0     |
|AAGN7013005|2024-01-25 06:33:50.742|0     |
|AAGN7013005|2024-01-25 07:48:09.229|0     |
|AAGN7022205|2024-01-24 09:39:30.167|0     |
|AAGN7022205|2024-01-24 09:44:56.294|0     |
|AAGN7022205|2024-01-24 10:23:01.281|0     |
|AAGN7022205|2024-01-24 10:33:27.103|0     |
|AAGN7022205|2024-01-24 10:38:05.096|1     |
|AAGN7022205|2024-01-24 11:23:22.652|1     |
|AAGN7022205|2024-01-24 11:34:59.031|1     |
|AAGN7022205|2024-01-24 12:13:04.285|1     |
|AAGN7022205|2024-01-24 12:13:34.285|0     |
+-----------+-----------------------+------+

+-----------+-----------------------+------+-------------+-----------+----------------+
|CpoSku     |DateUpdated            |status|lagged_status|lead_status|start_end_column|
+-----------+-----------------------+------+-------------+-----------+----------------+
|AAGN7013005|2024-01-24 10:32:06.898|0     |NULL         |1          |nan             |
|AAGN7013005|2024-01-24 10:37:05.09 |1     |0            |1          |start           |
|AAGN7013005|2024-01-24 12:12:56.825|1     |1            |1          |nan             |
|AAGN7013005|2024-01-24 12:18:01.647|1     |1            |1          |nan             |
|AAGN7013005|2024-01-24 13:18:18.456|1     |1            |1          |nan             |
|AAGN7013005|2024-01-24 15:00:22.534|1     |1            |1          |nan             |
|AAGN7013005|2024-01-24 15:06:04.075|1     |1            |1          |nan             |
|AAGN7013005|2024-01-24 16:09:04.796|1     |1            |1          |nan             |
|AAGN7013005|2024-01-24 16:14:01.193|1     |1            |1          |nan             |
|AAGN7013005|2024-01-24 22:30:06.217|1     |1            |1          |nan             |
|AAGN7013005|2024-01-24 23:37:16.612|1     |1            |0          |end             |
|AAGN7013005|2024-01-24 23:43:04.639|0     |1            |0          |nan             |
|AAGN7013005|2024-01-25 03:03:03.796|0     |0            |1          |nan             |
|AAGN7013005|2024-01-25 03:08:28.834|1     |0            |1          |start           |
|AAGN7013005|2024-01-25 04:05:43.995|1     |1            |0          |end             |
|AAGN7013005|2024-01-25 04:10:45.93 |0     |1            |0          |nan             |
|AAGN7013005|2024-01-25 06:33:50.742|0     |0            |0          |nan             |
|AAGN7013005|2024-01-25 07:48:09.229|0     |0            |NULL       |nan             |
|AAGN7022205|2024-01-24 09:39:30.167|0     |NULL         |0          |nan             |
|AAGN7022205|2024-01-24 09:44:56.294|0     |0            |0          |nan             |
|AAGN7022205|2024-01-24 10:23:01.281|0     |0            |0          |nan             |
|AAGN7022205|2024-01-24 10:33:27.103|0     |0            |1          |nan             |
|AAGN7022205|2024-01-24 10:38:05.096|1     |0            |1          |start           |
|AAGN7022205|2024-01-24 11:23:22.652|1     |1            |1          |nan             |
|AAGN7022205|2024-01-24 11:34:59.031|1     |1            |1          |nan             |
|AAGN7022205|2024-01-24 12:13:04.285|1     |1            |0          |end             |
|AAGN7022205|2024-01-24 12:13:34.285|0     |1            |NULL       |nan             |
+-----------+-----------------------+------+-------------+-----------+----------------+

+-----------+-----------------------+------+-------------+-----------+----------------+--------+
|CpoSku     |DateUpdated            |status|lagged_status|lead_status|start_end_column|group_id|
+-----------+-----------------------+------+-------------+-----------+----------------+--------+
|AAGN7013005|2024-01-24 10:32:06.898|0     |NULL         |1          |nan             |0       |
|AAGN7013005|2024-01-24 10:37:05.09 |1     |0            |1          |start           |1       |
|AAGN7013005|2024-01-24 12:12:56.825|1     |1            |1          |nan             |1       |
|AAGN7013005|2024-01-24 12:18:01.647|1     |1            |1          |nan             |1       |
|AAGN7013005|2024-01-24 13:18:18.456|1     |1            |1          |nan             |1       |
|AAGN7013005|2024-01-24 15:00:22.534|1     |1            |1          |nan             |1       |
|AAGN7013005|2024-01-24 15:06:04.075|1     |1            |1          |nan             |1       |
|AAGN7013005|2024-01-24 16:09:04.796|1     |1            |1          |nan             |1       |
|AAGN7013005|2024-01-24 16:14:01.193|1     |1            |1          |nan             |1       |
|AAGN7013005|2024-01-24 22:30:06.217|1     |1            |1          |nan             |1       |
|AAGN7013005|2024-01-24 23:37:16.612|1     |1            |0          |end             |1       |
|AAGN7013005|2024-01-24 23:43:04.639|0     |1            |0          |nan             |1       |
|AAGN7013005|2024-01-25 03:03:03.796|0     |0            |1          |nan             |1       |
|AAGN7013005|2024-01-25 03:08:28.834|1     |0            |1          |start           |2       |
|AAGN7013005|2024-01-25 04:05:43.995|1     |1            |0          |end             |2       |
|AAGN7013005|2024-01-25 04:10:45.93 |0     |1            |0          |nan             |2       |
|AAGN7013005|2024-01-25 06:33:50.742|0     |0            |0          |nan             |2       |
|AAGN7013005|2024-01-25 07:48:09.229|0     |0            |NULL       |nan             |2       |
|AAGN7022205|2024-01-24 09:39:30.167|0     |NULL         |0          |nan             |0       |
|AAGN7022205|2024-01-24 09:44:56.294|0     |0            |0          |nan             |0       |
|AAGN7022205|2024-01-24 10:23:01.281|0     |0            |0          |nan             |0       |
|AAGN7022205|2024-01-24 10:33:27.103|0     |0            |1          |nan             |0       |
|AAGN7022205|2024-01-24 10:38:05.096|1     |0            |1          |start           |1       |
|AAGN7022205|2024-01-24 11:23:22.652|1     |1            |1          |nan             |1       |
|AAGN7022205|2024-01-24 11:34:59.031|1     |1            |1          |nan             |1       |
|AAGN7022205|2024-01-24 12:13:04.285|1     |1            |0          |end             |1       |
|AAGN7022205|2024-01-24 12:13:34.285|0     |1            |NULL       |nan             |1       |
+-----------+-----------------------+------+-------------+-----------+----------------+--------+

+-----------+------+--------+-----------------------+-----------------------+
|CpoSku     |status|group_id|start_end              |end_date               |
+-----------+------+--------+-----------------------+-----------------------+
|AAGN7013005|1     |1       |2024-01-24 10:37:05.09 |2024-01-24 23:37:16.612|
|AAGN7013005|1     |2       |2024-01-25 03:08:28.834|2024-01-25 04:05:43.995|
|AAGN7022205|1     |1       |2024-01-24 10:38:05.096|2024-01-24 12:13:04.285|
+-----------+------+--------+-----------------------+-----------------------+
1

试试下面的代码。请注意,这个答案假设你总是会有一个开始和结束的组合。

w0=Window.partitionBy("CpoSku").orderBy(lit("A"))
df_test=df_test.withColumn('lag',lag(col("CPO_BB_Status")).over(w0))
df_test=df_test.withColumn('lead',lead(col("CPO_BB_Status")).over(w0))
df_test=df_test.withColumn("cond",when((col("CPO_BB_Status")==1) &\
                                       (col("lag")==0),"start")\
                                  .otherwise(when((col("CPO_BB_Status")==1) & \
                                                  (col("lead")==0),"stop")))
df_test.show(truncate=False)
df_test1=df_test.filter((col("cond")=="start")| (col("cond")=="stop"))
df_test1.show(truncate=False)
df_test1=df_test1.select("CpoSku","DateUpdated","cond")
windowSpec = Window.orderBy(lit("A"))
df_test2 = df_test1.withColumn("sequential_column", ((row_number().over(windowSpec) - 1) / 2 + 1).cast("int"))
df_test2.show()
df_test2=df_test2.groupBy("CpoSku","sequential_column").pivot("cond").agg(first("DateUpdated"))
df_test2.show(truncate=False

)

撰写回答