Pyspark“递归”函数涉及最后一天

2024-04-25 06:50:33 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在pyspark中开发一个进程,我有一个数据帧,我试图再添加一列(使用withColumn方法)。在

问题是公式是:

STATUS1 = If 'PETP-today' > 0 then 'Status1 last day' + 'PETP-today' else 0

Status1的每个结果都涉及最后一天结果的Status1。在

我发现的一个解决方案是创建一个pandas数据帧,然后一个一个地运行记录,直到我可以使用变量计算每个记录为止。但是我会有性能问题。你能帮忙吗?在

考虑dataframe列:Date(daily)/PETP(Float)/STATUS1?(浮动)

我真的很感谢你的帮助!在


Tags: 数据方法todayif进程记录pyspark公式
1条回答
网友
1楼 · 发布于 2024-04-25 06:50:33

我认为解决方案的关键是lag函数。试试这个(为了简单起见,我假设所有列的数据都是整数):

首先,将列上移一天

import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import Window

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

columns = ['date', 'petp', 'status']
data = [(0, 0, 0), (1, 1, 1), (2, 2, 2), (3,3,3), (4,4,4), (5,5,5)]
pd_data = pd.DataFrame.from_records(data=data, columns=columns)
spark_data = spark.createDataFrame(pd_data)

spark_data_with_lag = spark_data.withColumn("status_last_day", F.lag("status", 1, 0).over(Window.orderBy("date")))
spark_data_with_lag.show()

+  +  +   +       -+
|date|petp|status|status_last_day|
+  +  +   +       -+
|   1|   1|     1|              0|
|   2|   2|     2|              1|
|   3|   3|     3|              2|
|   4|   4|     4|              3|
|   5|   5|     5|              4|
+  +  +   +       -+

然后在条件中使用该数据

^{pr2}$

我希望这就是你想要的。在

相关问题 更多 >

    热门问题