我在pyspark
中有一个数据帧,如下所示。你知道吗
df = sqlContext.createDataFrame(
[
("101", "99.10", "2019-06-04"),
("102", "89.27", "2019-06-04"),
("102", "89.10", "2019-03-04"),
("103", "73.11", "2019-09-10"),
("101", "-69.81", "2019-09-11"),
("101", "12.51", "2018-12-14"),
("101", "43.23", "2018-09-11")
],
("user_id", "amount", "trans_date"))
我想知道以下事情
1)首先在数据帧中找到max of trans_date
,并存储为变量。我在下面做的时候得到的
from pyspark.sql import functions as f
from pyspark.sql import Window
max_date=df.groupby().agg(f.max('trans_date')).collect()[0].asDict()['max(trans_date)']
2)每个user_id
的amount
总和
df1=df.withColumn('Balance', f.sum('amount').over(Window.partitionBy('user_id')))
3)现在使用这个max_date
我想创建一些列并填充值,例如
`days_0_30` if `trans_date` is between `max_date` and `30 days before
`days_31_60` if `trans_date` is between `max_date - 31 days` and `60 days before
and so on. I am able to do it using below
df2 = df1.withColumn(days_0_30, f.when((df1.trans_date <= '2019-09-11') & (df1.trans_date >= '2019-06-11'), df1.Balance).otherwise('null')).withColumn(days_31_60, f.when((df1.trans_date <= '2019-06-10') & (df1.trans_date >= '2019-03-11'), df1.Balance).otherwise('null')).withColumn(days_61_90, f.when((df1.trans_date <= '2019-03-10') & (df1.trans_date >= '2018-12-11'), df1.Balance).otherwise('null')).withColumn(days_91_120, f.when((df1.trans_date <= '2018-12-10') & (df1.trans_date >= '2018-09-11'), df1.Balance).otherwise('null')).withColumn(days_121_150, f.when((df1.trans_date <= '2018-09-10') & (df1.trans_date >= '2018-06-11'), df1.Balance).otherwise('null'))
Ifdate is not in range defined
值应为null
但我在上面有硬编码的日期。我希望这能达到我的目的。你知道吗
我应该怎么做才能以一种更优雅的方式达到我的结果呢
仅使用列表理解:
按照步骤获取df1(确保列
trans_date
是DateType())然后使用列表理解创建一个包含三个元素的元组列表(range\u name、range\u start\u date、range\u end\u date)
现在使用列表理解生成这些新列:
注意:看起来您的代码中使用的间隔是90天,而不是30天。但我相信,根据您的需要调整上述代码对您来说很简单。你知道吗
相关问题 更多 >
编程相关推荐