Pyspark按组填充缺少的日期并填充以前的值

2024-04-25 07:28:57 发布

您现在位置:Python中文网/ 问答频道 /正文

Spark版本3.0

我有两个数据帧

我使用日期范围创建了一个带有日期列的数据框

我有第二个spark数据框,包含公司名称、日期和价值

我想合并按公司分组的DF2到DF1,这样我可以填充缺少的日期,也可以填充上一行中缺少的值

我该怎么做?我曾考虑过左键连接,但似乎效果不太好

enter image description here


Tags: 数据版本名称公司sparkdf1左键df2
1条回答
网友
1楼 · 发布于 2024-04-25 07:28:57

试试这个。有点复杂

import pyspark.sql.functions as f
from pyspark.sql import Window

df1 = spark.read.option("header","true").option("inferSchema","true").csv("test1.csv") \
  .withColumn('Date', f.to_date('Date', 'dd/MM/yyyy'))
df2 = spark.read.option("header","true").option("inferSchema","true").csv("test2.csv") \
  .withColumn('Date', f.to_date('Date', 'dd/MM/yyyy'))

w1 = Window.orderBy('Company', 'Date')
w2 = Window.orderBy('Company', 'Date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w3 = Window.partitionBy('partition').orderBy('Company', 'Date')

df1.crossJoin(df2.select('Company').distinct()) \
   .join(df2, ['Company', 'Date'], 'left') \
   .withColumn('range', (f.col('Value').isNull() | f.lead(f.col('Value'), 1, 0).over(w1).isNull()) != f.col('Value').isNull()) \
   .withColumn('partition', f.sum(f.col('range').cast('int')).over(w2)) \
   .withColumn('fill', f.first('Value').over(w3)) \
   .orderBy('Company', 'Date') \
   .selectExpr('Company', 'Date', 'coalesce(Value, fill) as Value') \
   .show(20, False)

+   -+     +  -+
|Company|Date      |Value|
+   -+     +  -+
|A      |2000-01-01|13   |
|A      |2000-01-02|14   |
|A      |2000-01-03|15   |
|A      |2000-01-04|19   |
|A      |2000-01-05|19   |
|A      |2000-01-06|19   |
|A      |2000-01-07|19   |
|A      |2000-01-08|19   |
|A      |2000-01-09|19   |
|B      |2000-01-01|19   |
|B      |2000-01-02|19   |
|B      |2000-01-03|20   |
|B      |2000-01-04|25   |
|B      |2000-01-05|23   |
|B      |2000-01-06|24   |
|B      |2000-01-07|24   |
|B      |2000-01-08|24   |
|B      |2000-01-09|24   |
+   -+     +  -+

您可以通过多次添加.show来查看每一行发生了什么,这可能会有所帮助

相关问题 更多 >