火花窗功能-日期范围

2024-05-15 08:32:32 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个带数据的Spark SQL DataFrame,我想要得到的是给定日期范围内当前行之前的所有行。例如,我希望所有的行都在给定行之前7天。我发现我需要使用一个Window Function类似的:

Window \
    .partitionBy('id') \
    .orderBy('start')

问题来了。我想有一个rangeBetween7天,但在Spark文档中我找不到任何关于这个的信息。Spark甚至提供了这样的选择吗?现在,我要把前面的所有行与:

.rowsBetween(-sys.maxsize, 0)

但希望实现如下目标:

.rangeBetween("7 days", 0)

如果有人能在这件事上帮我,我将非常感激。提前谢谢!


Tags: 数据文档信息iddataframesqlfunctionwindow
1条回答
网友
1楼 · 发布于 2024-05-15 08:32:32

火花>;=2.3

由于Spark 2.3可以使用SQL API来使用interval对象,但是DataFrameAPI的支持是still work in progress

df.createOrReplaceTempView("df")

spark.sql(
    """SELECT *, mean(some_value) OVER (
        PARTITION BY id 
        ORDER BY CAST(start AS timestamp) 
        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
     ) AS mean FROM df""").show()

## +---+----------+----------+------------------+       
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

火花<;2.3

据我所知,无论是在星火还是蜂巢中,这都是不可能的。两者都要求与RANGE一起使用的ORDER BY子句是数字的。我找到的最接近的东西是转换为时间戳并以秒为单位操作。假设start列包含date类型:

from pyspark.sql import Row

row = Row("id", "start", "some_value")
df = sc.parallelize([
    row(1, "2015-01-01", 20.0),
    row(1, "2015-01-06", 10.0),
    row(1, "2015-01-07", 25.0),
    row(1, "2015-01-12", 30.0),
    row(2, "2015-01-01", 5.0),
    row(2, "2015-01-03", 30.0),
    row(2, "2015-02-01", 20.0)
]).toDF().withColumn("start", col("start").cast("date"))

小助手和窗口定义:

from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col


# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400 

最终查询:

w = (Window()
   .partitionBy(col("id"))
   .orderBy(col("start").cast("timestamp").cast("long"))
   .rangeBetween(-days(7), 0))

df.select(col("*"), mean("some_value").over(w).alias("mean")).show()

## +---+----------+----------+------------------+
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

虽然不漂亮但很管用。


*Hive Language Manual, Types

相关问题 更多 >