根据条件在pyspark中获取窗口的开始和结束时间
使用PySpark和Databricks。首先,从数据集中找到最早的更新时间(Minimum DateUpdated)。接着,检查在Comp_BB_Status等于1的情况下,从这个更新时间开始,看看需要多长时间才能把Comp_BB_Status改为0。我想找出Comp_BB_Status为1的时间段,并计算这个时间段的持续时间,单位是秒和分钟。
任何指导都非常感谢。我是PySpark的新手。
编辑
创建数据框的代码:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
# Initialize SparkSession
spark_a = SparkSession.builder \
.appName("Create DataFrame") \
.getOrCreate()
# Define schema for the DataFrame
schema = StructType([
StructField("CpoSku", StringType(), True),
StructField("DateUpdated", TimestampType(), True),
StructField("CPO_BB_Status", IntegerType(), True)
])
# Define data
data = [
("AAGN7013005", "2024-01-24T05:02:06.898+00:00", 0),
("AAGN7013005", "2024-01-24T05:07:05.090+00:00", 1),
("AAGN7013005", "2024-01-24T06:42:56.825+00:00", 1),
("AAGN7013005", "2024-01-24T06:48:01.647+00:00", 1),
("AAGN7013005", "2024-01-24T07:48:18.456+00:00", 1),
("AAGN7013005", "2024-01-24T09:30:22.534+00:00", 1),
("AAGN7013005", "2024-01-24T09:36:04.075+00:00", 1),
("AAGN7013005", "2024-01-24T10:39:04.796+00:00", 1),
("AAGN7013005", "2024-01-24T10:44:01.193+00:00", 1),
("AAGN7013005", "2024-01-24T17:00:06.217+00:00", 1),
("AAGN7013005", "2024-01-24T18:07:16.612+00:00", 1),
("AAGN7013005", "2024-01-24T18:13:04.639+00:00", 0),
("AAGN7013005", "2024-01-24T21:33:03.796+00:00", 0),
("AAGN7013005", "2024-01-24T21:38:28.834+00:00", 1),
("AAGN7013005", "2024-01-24T22:35:43.995+00:00", 1),
("AAGN7013005", "2024-01-24T22:40:45.930+00:00", 0),
("AAGN7022205", "2024-01-24T04:09:30.167+00:00", 0),
("AAGN7022205", "2024-01-24T04:14:56.294+00:00", 0),
("AAGN7022205", "2024-01-24T04:53:01.281+00:00", 0),
("AAGN7022205", "2024-01-24T05:03:27.103+00:00", 0),
("AAGN7022205", "2024-01-24T05:08:05.096+00:00" ,1),
("AAGN7022205", "2024-01-24T05:53:22.652+00:00", 1),
("AAGN7022205", "2024-01-24T06:04:59.031+00:00", 1),
("AAGN7022205", "2024-01-24T06:43:04.285+00:00", 1),
("AAGN7022205", "2024-01-24T06:43:34.285+00:01", 0)
]
# Create DataFrame
df_test = spark.createDataFrame(data, schema=schema)
# Show DataFrame schema and preview data
df_test.printSchema()
df_test.show()
# Stop SparkSession
spark_a.stop()
2 个回答
0
这里有一个可能的解决方案,步骤如下。
我们知道,当之前的状态是0而当前状态是1时,窗口就开始了。
同样,当当前状态是1而下一个状态是0时,窗口就结束了。
为了检查这些条件,我们可以创建两个额外的列,一个是滞后状态列,另一个是领先状态列。
通过对状态、滞后列和领先列应用这些条件,我们可以在一个单独的列中标记窗口的开始和结束。
接下来的步骤是创建窗口跨度标记,也就是用group_id来识别窗口的跨度。
一旦通过唯一的group_id识别出窗口跨度,我们只需要在公共列上进行分组,然后找到最小日期作为开始日期
和最大日期作为结束日期
。
下面是实现的代码:
from pyspark.sql import Window
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import pyspark.sql.types as T
from pyspark.sql.functions import to_timestamp, col
spark = SparkSession.builder.appName("extract start end dates").getOrCreate()
schema = T.StructType([
T.StructField("CpoSku", T.StringType(), nullable=False),
T.StructField("DateUpdated", T.StringType(), nullable=False),
T.StructField("status", T.IntegerType(), nullable=False),
])
data = [
("AAGN7013005", "2024-01-24T05:02:06.898+00:00", 0),
("AAGN7013005", "2024-01-24T05:07:05.090+00:00", 1),
("AAGN7013005", "2024-01-24T06:42:56.825+00:00", 1),
("AAGN7013005", "2024-01-24T06:48:01.647+00:00", 1),
("AAGN7013005", "2024-01-24T07:48:18.456+00:00", 1),
("AAGN7013005", "2024-01-24T09:30:22.534+00:00", 1),
("AAGN7013005", "2024-01-24T09:36:04.075+00:00", 1),
("AAGN7013005", "2024-01-24T10:39:04.796+00:00", 1),
("AAGN7013005", "2024-01-24T10:44:01.193+00:00", 1),
("AAGN7013005", "2024-01-24T17:00:06.217+00:00", 1),
("AAGN7013005", "2024-01-24T18:07:16.612+00:00", 1),
("AAGN7013005", "2024-01-24T18:13:04.639+00:00", 0),
("AAGN7013005", "2024-01-24T21:33:03.796+00:00", 0),
("AAGN7013005", "2024-01-24T21:38:28.834+00:00", 1),
("AAGN7013005", "2024-01-24T22:35:43.995+00:00", 1),
("AAGN7013005", "2024-01-24T22:40:45.930+00:00", 0),
("AAGN7013005", "2024-01-25T01:03:50.742+00:00", 0),
("AAGN7013005", "2024-01-25T02:18:09.229+00:00", 0),
("AAGN7022205", "2024-01-24T04:09:30.167+00:00", 0),
("AAGN7022205", "2024-01-24T04:14:56.294+00:00", 0),
("AAGN7022205", "2024-01-24T04:53:01.281+00:00", 0),
("AAGN7022205", "2024-01-24T05:03:27.103+00:00", 0),
("AAGN7022205", "2024-01-24T05:08:05.096+00:00", 1),
("AAGN7022205", "2024-01-24T05:53:22.652+00:00", 1),
("AAGN7022205", "2024-01-24T06:04:59.031+00:00", 1),
("AAGN7022205", "2024-01-24T06:43:04.285+00:00", 1),
("AAGN7022205", "2024-01-24T06:43:34.285+00:00", 0),
]
df = spark.createDataFrame(data, schema=schema)
sample_df = df.withColumn("DateUpdated", to_timestamp(col("DateUpdated"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
sample_df.show(n=200, truncate=False)
windowSpec = Window.partitionBy("CpoSku").orderBy("DateUpdated")
sample_df = sample_df.withColumn("lagged_status", F.lag(col("status"), 1).over(windowSpec))
sample_df = sample_df.withColumn("lead_status", F.lead(col("status"), 1).over(windowSpec))
sample_df = sample_df.withColumn("start_end_column", F.when(
(
(F.col("status") == F.lit(1)) & (F.col("lagged_status") == F.lit(0))
), "start")
.when(
((F.col("status") == F.lit(1)) & (F.col("lead_status") == F.lit(0)))
, "end")
.otherwise("nan"))
sample_df.show(n=200, truncate=False)
start_indicator = F.when(F.col("start_end_column") == F.lit("start"), 1).otherwise(0)
group_id = F.sum(start_indicator).over(windowSpec)
sample_df = sample_df.withColumn("group_id", group_id)
sample_df.show(n=200, truncate=False)
sample_df = sample_df.filter(col("status") == F.lit(1)).groupBy("CpoSku", "status", "group_id").agg(min("DateUpdated").alias("start_end"), max("DateUpdated").alias("end_date"))
sample_df.show(n=200, truncate=False)
输出结果:
+-----------+------+--------+-----------------------+-----------------------+
|CpoSku |status|group_id|start_end |end_date |
+-----------+------+--------+-----------------------+-----------------------+
|AAGN7013005|1 |1 |2024-01-24 10:37:05.09 |2024-01-24 23:37:16.612|
|AAGN7013005|1 |2 |2024-01-25 03:08:28.834|2024-01-25 04:05:43.995|
|AAGN7022205|1 |1 |2024-01-24 10:38:05.096|2024-01-24 12:13:04.285|
+-----------+------+--------+-----------------------+-----------------------+
完整输出如下:
+-----------+-----------------------+------+
|CpoSku |DateUpdated |status|
+-----------+-----------------------+------+
|AAGN7013005|2024-01-24 10:32:06.898|0 |
|AAGN7013005|2024-01-24 10:37:05.09 |1 |
|AAGN7013005|2024-01-24 12:12:56.825|1 |
|AAGN7013005|2024-01-24 12:18:01.647|1 |
|AAGN7013005|2024-01-24 13:18:18.456|1 |
|AAGN7013005|2024-01-24 15:00:22.534|1 |
|AAGN7013005|2024-01-24 15:06:04.075|1 |
|AAGN7013005|2024-01-24 16:09:04.796|1 |
|AAGN7013005|2024-01-24 16:14:01.193|1 |
|AAGN7013005|2024-01-24 22:30:06.217|1 |
|AAGN7013005|2024-01-24 23:37:16.612|1 |
|AAGN7013005|2024-01-24 23:43:04.639|0 |
|AAGN7013005|2024-01-25 03:03:03.796|0 |
|AAGN7013005|2024-01-25 03:08:28.834|1 |
|AAGN7013005|2024-01-25 04:05:43.995|1 |
|AAGN7013005|2024-01-25 04:10:45.93 |0 |
|AAGN7013005|2024-01-25 06:33:50.742|0 |
|AAGN7013005|2024-01-25 07:48:09.229|0 |
|AAGN7022205|2024-01-24 09:39:30.167|0 |
|AAGN7022205|2024-01-24 09:44:56.294|0 |
|AAGN7022205|2024-01-24 10:23:01.281|0 |
|AAGN7022205|2024-01-24 10:33:27.103|0 |
|AAGN7022205|2024-01-24 10:38:05.096|1 |
|AAGN7022205|2024-01-24 11:23:22.652|1 |
|AAGN7022205|2024-01-24 11:34:59.031|1 |
|AAGN7022205|2024-01-24 12:13:04.285|1 |
|AAGN7022205|2024-01-24 12:13:34.285|0 |
+-----------+-----------------------+------+
+-----------+-----------------------+------+-------------+-----------+----------------+
|CpoSku |DateUpdated |status|lagged_status|lead_status|start_end_column|
+-----------+-----------------------+------+-------------+-----------+----------------+
|AAGN7013005|2024-01-24 10:32:06.898|0 |NULL |1 |nan |
|AAGN7013005|2024-01-24 10:37:05.09 |1 |0 |1 |start |
|AAGN7013005|2024-01-24 12:12:56.825|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 12:18:01.647|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 13:18:18.456|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 15:00:22.534|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 15:06:04.075|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 16:09:04.796|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 16:14:01.193|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 22:30:06.217|1 |1 |1 |nan |
|AAGN7013005|2024-01-24 23:37:16.612|1 |1 |0 |end |
|AAGN7013005|2024-01-24 23:43:04.639|0 |1 |0 |nan |
|AAGN7013005|2024-01-25 03:03:03.796|0 |0 |1 |nan |
|AAGN7013005|2024-01-25 03:08:28.834|1 |0 |1 |start |
|AAGN7013005|2024-01-25 04:05:43.995|1 |1 |0 |end |
|AAGN7013005|2024-01-25 04:10:45.93 |0 |1 |0 |nan |
|AAGN7013005|2024-01-25 06:33:50.742|0 |0 |0 |nan |
|AAGN7013005|2024-01-25 07:48:09.229|0 |0 |NULL |nan |
|AAGN7022205|2024-01-24 09:39:30.167|0 |NULL |0 |nan |
|AAGN7022205|2024-01-24 09:44:56.294|0 |0 |0 |nan |
|AAGN7022205|2024-01-24 10:23:01.281|0 |0 |0 |nan |
|AAGN7022205|2024-01-24 10:33:27.103|0 |0 |1 |nan |
|AAGN7022205|2024-01-24 10:38:05.096|1 |0 |1 |start |
|AAGN7022205|2024-01-24 11:23:22.652|1 |1 |1 |nan |
|AAGN7022205|2024-01-24 11:34:59.031|1 |1 |1 |nan |
|AAGN7022205|2024-01-24 12:13:04.285|1 |1 |0 |end |
|AAGN7022205|2024-01-24 12:13:34.285|0 |1 |NULL |nan |
+-----------+-----------------------+------+-------------+-----------+----------------+
+-----------+-----------------------+------+-------------+-----------+----------------+--------+
|CpoSku |DateUpdated |status|lagged_status|lead_status|start_end_column|group_id|
+-----------+-----------------------+------+-------------+-----------+----------------+--------+
|AAGN7013005|2024-01-24 10:32:06.898|0 |NULL |1 |nan |0 |
|AAGN7013005|2024-01-24 10:37:05.09 |1 |0 |1 |start |1 |
|AAGN7013005|2024-01-24 12:12:56.825|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 12:18:01.647|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 13:18:18.456|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 15:00:22.534|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 15:06:04.075|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 16:09:04.796|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 16:14:01.193|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 22:30:06.217|1 |1 |1 |nan |1 |
|AAGN7013005|2024-01-24 23:37:16.612|1 |1 |0 |end |1 |
|AAGN7013005|2024-01-24 23:43:04.639|0 |1 |0 |nan |1 |
|AAGN7013005|2024-01-25 03:03:03.796|0 |0 |1 |nan |1 |
|AAGN7013005|2024-01-25 03:08:28.834|1 |0 |1 |start |2 |
|AAGN7013005|2024-01-25 04:05:43.995|1 |1 |0 |end |2 |
|AAGN7013005|2024-01-25 04:10:45.93 |0 |1 |0 |nan |2 |
|AAGN7013005|2024-01-25 06:33:50.742|0 |0 |0 |nan |2 |
|AAGN7013005|2024-01-25 07:48:09.229|0 |0 |NULL |nan |2 |
|AAGN7022205|2024-01-24 09:39:30.167|0 |NULL |0 |nan |0 |
|AAGN7022205|2024-01-24 09:44:56.294|0 |0 |0 |nan |0 |
|AAGN7022205|2024-01-24 10:23:01.281|0 |0 |0 |nan |0 |
|AAGN7022205|2024-01-24 10:33:27.103|0 |0 |1 |nan |0 |
|AAGN7022205|2024-01-24 10:38:05.096|1 |0 |1 |start |1 |
|AAGN7022205|2024-01-24 11:23:22.652|1 |1 |1 |nan |1 |
|AAGN7022205|2024-01-24 11:34:59.031|1 |1 |1 |nan |1 |
|AAGN7022205|2024-01-24 12:13:04.285|1 |1 |0 |end |1 |
|AAGN7022205|2024-01-24 12:13:34.285|0 |1 |NULL |nan |1 |
+-----------+-----------------------+------+-------------+-----------+----------------+--------+
+-----------+------+--------+-----------------------+-----------------------+
|CpoSku |status|group_id|start_end |end_date |
+-----------+------+--------+-----------------------+-----------------------+
|AAGN7013005|1 |1 |2024-01-24 10:37:05.09 |2024-01-24 23:37:16.612|
|AAGN7013005|1 |2 |2024-01-25 03:08:28.834|2024-01-25 04:05:43.995|
|AAGN7022205|1 |1 |2024-01-24 10:38:05.096|2024-01-24 12:13:04.285|
+-----------+------+--------+-----------------------+-----------------------+
1
试试下面的代码。请注意,这个答案假设你总是会有一个开始和结束的组合。
w0=Window.partitionBy("CpoSku").orderBy(lit("A"))
df_test=df_test.withColumn('lag',lag(col("CPO_BB_Status")).over(w0))
df_test=df_test.withColumn('lead',lead(col("CPO_BB_Status")).over(w0))
df_test=df_test.withColumn("cond",when((col("CPO_BB_Status")==1) &\
(col("lag")==0),"start")\
.otherwise(when((col("CPO_BB_Status")==1) & \
(col("lead")==0),"stop")))
df_test.show(truncate=False)
df_test1=df_test.filter((col("cond")=="start")| (col("cond")=="stop"))
df_test1.show(truncate=False)
df_test1=df_test1.select("CpoSku","DateUpdated","cond")
windowSpec = Window.orderBy(lit("A"))
df_test2 = df_test1.withColumn("sequential_column", ((row_number().over(windowSpec) - 1) / 2 + 1).cast("int"))
df_test2.show()
df_test2=df_test2.groupBy("CpoSku","sequential_column").pivot("cond").agg(first("DateUpdated"))
df_test2.show(truncate=False
)