如何使用pysp高效地实现groupby和后续的滚动应用

2024-04-20 08:32:39 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一段时间的购买数据。每次购买都属于一个用户和一个子组。此外,每次购买都有一个日期。我想实现的是对用户和子组的每个组合的利润列应用滚动函数(使用日期)

下面我创建了一个独立的示例,其中包含了我的问题的pandas解决方案和我的问题的pyspark解决方案。我现在的问题是:在pyspark中有没有更有效的方法来实现这一点?

独立示例:

# Create some dummy date
date_today = datetime.now()
days = pd.date_range(date_today, date_today + timedelta(250), freq='D')

np.random.seed(seed=1111)
data = np.random.randint(1, high=100, size=len(days))
df1 = pd.DataFrame({'dates': days, 'profit': data,'user':1,'subgroup':np.random.randint(0,5,len(days))})

days = pd.date_range(date_today+ timedelta(hours = 5), date_today + timedelta(130), freq='D')
data = np.random.randint(1, high=100, size=len(days))
df2 = pd.DataFrame({'dates': days, 'profit': data,'user':2,'subgroup':np.random.randint(0,5,len(days))})

df_all = pd.concat([df1,df2])

熊猫解决方案:

df_agg = df_all.groupby(by = ["user","subgroup"]).apply(lambda df: df.set_index('dates').rolling("20d").agg({"profit":[np.mean,np.max]}))

慢火花解决方案:

sdf_all = spark.createDataFrame(df_all)

days = lambda i: i * 86400 
sdf_output = (sdf_all.withColumn("profit_mean",F.avg(F.col("profit"))
                                        .over(Window.partitionBy("user","subgroup")
                                        .orderBy(F.col("dates").cast("long"))
                                        .rangeBetween(-days(20),0)))
                     .withColumn("profit_max",F.max(F.col("profit"))
                                        .over(Window.partitionBy("user","subgroup")
                                        .orderBy(F.col("dates").cast("long"))
                                        .rangeBetween(-days(20),0)))

             )

Tags: dfdatatodaydatenprandomall解决方案