我有一个pyspark数据帧,如下所示:
import pandas as pd
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.master("local")
.getOrCreate())
spark.conf.set("spark.sql.session.timeZone", "UTC")
INPUT = {
"idx": [1, 1, 1, 1, 0],
"consumption": [10.0, 20.0, 30.0, 40.0, 5.0],
"valid_from": [
pd.Timestamp("2019-01-01 00:00:00+00:00", tz="UTC"),
pd.Timestamp("2019-01-02 00:00:00+00:00", tz="UTC"),
pd.Timestamp("2019-01-03 00:00:00+00:00", tz="UTC"),
pd.Timestamp("2019-01-06 00:00:00+00:00", tz="UTC"),
pd.Timestamp("2019-01-01 00:00:00+00:00", tz="UTC"),
],
"valid_to": [
pd.Timestamp("2019-01-02 00:00:00+0000", tz="UTC"),
pd.Timestamp("2019-01-05 00:00:00+0000", tz="UTC"),
pd.Timestamp("2019-01-05 00:00:00+0000", tz="UTC"),
pd.Timestamp("2019-01-08 00:00:00+0000", tz="UTC"),
pd.Timestamp("2019-01-02 00:00:00+00:00", tz="UTC"),
],
}
df=pd.DataFrame.from_dict(INPUT)
spark.createDataFrame(df).show()
>>>
+---+-----------+-------------------+-------------------+
|idx|consumption| valid_from| valid_to|
+---+-----------+-------------------+-------------------+
| 1| 10.0|2019-01-01 00:00:00|2019-01-02 00:00:00|
| 1| 20.0|2019-01-02 00:00:00|2019-01-05 00:00:00|
| 1| 30.0|2019-01-03 00:00:00|2019-01-05 00:00:00|
| 1| 40.0|2019-01-06 00:00:00|2019-01-08 00:00:00|
| 0| 5.0 |2019-01-01 00:00:00|2019-01-02 00:00:00|
+---+-----------+-------------------+-------------------+
我只想对每个idx的重叠间隔片上的consumption
求和:
+---+-------------------+-----------+
|idx| timestamp|consumption|
+---+-------------------+-----------+
| 1|2019-01-01 00:00:00| 10.0|
| 1|2019-01-02 00:00:00| 20.0|
| 1|2019-01-03 00:00:00| 50.0|
| 1|2019-01-04 00:00:00| 50.0|
| 1|2019-01-05 00:00:00| 0.0|
| 1|2019-01-06 00:00:00| 40.0|
| 1|2019-01-07 00:00:00| 40.0|
| 1|2019-01-08 00:00:00| 0.0|
| 0|2019-01-01 00:00:00| 5.0|
| 0|2019-01-02 00:00:00| 0.0|
+---+-------------------+-----------+
您可以使用sequence将间隔扩展为单个天,explode天列表,然后sum每个
timestamp
和idx
的consumption
:输出:
备注:
sequence
包括间隔的最后一个值,因此一天必须是substracted到valid_to
李>valid_to
值的完全联接,用0.0
填充null
值,恢复丢失的间隔结束日期李>相关问题 更多 >
编程相关推荐