pyspark递归行链转换到scala

2024-06-17 10:20:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我编写了一个pyspark实现,它通过逐行读取来按顺序递增(递归)乘以列值。由于我们这边的平台限制,我现在需要在没有UDAF的情况下将其转换为Scala。我看了看at this implementation,但随着年月数的增长,它会占用很长时间,因为它需要像年月数一样的临时表

大约有100个年头和70个部门给出了这个数据框中的行总数为7000。我们需要计算每个部门的起始值(按序列中第一年的月份),并将其与下一行的值相乘。得到的乘以因子需要与下一行相乘,依此类推

示例数据:

department, productivity_ratio, year_month
101,1.00,2013-01-01
101,0.98,2013-02-01
101,1.01,2013-03-01
101,0.99,2013-04-01
...
102,1.00,2013-01-01
102,1.02,2013-02-01
102,0.96,2013-03-01
...

预期结果:

department,productivity_ratio,year_month,chained_productivity_ratio
101,1.00,2013-01-01,1.00
101,0.98,2013-02-01,0.98   (1.00*0.98)
101,1.01,2013-03-01,0.9898 (1.00*0.98*1.01)
101,0.99,2013-04-01,0.9799 (1.00*0.98*1.01*0.99)
...
102,1.00,2013-01-01,1.00   (reset to 1.00 as starting point as department name changed in sequence)
102,1.02,2013-02-01,1.02   (1.00*1.02)
102,0.96,2013-03-01,0.9792 (1.00*1.02*0.96)
...

在scala中,是否有任何方法可以更快地实现这一点,或者将其转换为部门间的循环,并将生产率比率视为与先前值相乘的序列,或者将数据帧更改为不同的数据结构,以避免出现分布式排序问题

现有Pypark代码:

%pyspark
import pandas as pd
import numpy as np
import StringIO

inputParquet = "s3://path/to/parquet/files/"
inputData = spark.read.parquet(inputParquet)
inputData.printSchema

root
|-- department: string
|-- productivity_ratio: double
|-- year_month: date

inputSorted=inputData.sort('department', 'year_month')
inputSortedNotnull=inputSorted.dropna()
finalInput=inputSortedNotnull.toPandas()

prev_dept = 999
prev_productivity_ratio = 1

new_productivity_chained = []

for t in finalInput.itertuples():
    if prev_dept == t[1]:
        new_productivity_chained.append(t[2] * prev_productivity_ratio)
        prev_productivity_ratio = t[2] * prev_productivity_ratio
    else:
        prev_productivity_ratio = 1
        new_productivity_chained.append(prev_productivity_ratio)
    prev_dept = t[1]

productivityChained = finalInput.assign(chained_productivity=new_productivity_chained)

Tags: 数据importnewasyearproductivity部门department
1条回答
网友
1楼 · 发布于 2024-06-17 10:20:06

您可以使用window lag函数和执行exp(sum(log(<column>)))来计算chained_productivity_ratio,我们使用的所有函数都是spark inbuilt functions性能将非常好


Example:

In Pyspark:

df.show()
#+     +         +     +
#|department|productivity_ratio|year_month|
#+     +         +     +
#|       101|              1.00|2013-01-01|
#|       101|              0.98|2013-02-01|
#|       101|              1.01|2013-03-01|
#|       101|              0.99|2013-04-01|
#|       102|              1.00|2013-01-01|
#|       102|              1.02|2013-02-01|
#|       102|              0.96|2013-03-01|
#+     +         +     +

from pyspark.sql.functions import *
from pyspark.sql import Window

w = Window.partitionBy("department").orderBy("year_month")

df.withColumn("chained_productivity_ratio",exp(sum(log(col("productivity_ratio"))).over(w))).show()
#+     +         +     +             +
#|department|productivity_ratio|year_month|chained_productivity_ratio|
#+     +         +     +             +
#|       101|              1.00|2013-01-01|                       1.0|
#|       101|              0.98|2013-02-01|                      0.98|
#|       101|              1.01|2013-03-01|                    0.9898|
#|       101|              0.99|2013-04-01|        0.9799019999999999|
#|       102|              1.00|2013-01-01|                       1.0|
#|       102|              1.02|2013-02-01|                      1.02|
#|       102|              0.96|2013-03-01|                    0.9792|
#+     +         +     +             +

In Scala:

import org.apache.spark.sql.functions._

import org.apache.spark.sql.expressions._

val w = Window.partitionBy("department").orderBy("year_month")

df.withColumn("chained_productivity_ratio",exp(sum(log(col("productivity_ratio"))).over(w))).show()

//+     +         +     +             +
//|department|productivity_ratio|year_month|chained_productivity_ratio|
//+     +         +     +             +
//|       101|              1.00|2013-01-01|                       1.0|
//|       101|              0.98|2013-02-01|                      0.98|
//|       101|              1.01|2013-03-01|                    0.9898|
//|       101|              0.99|2013-04-01|        0.9799019999999999|
//|       102|              1.00|2013-01-01|                       1.0|
//|       102|              1.02|2013-02-01|                      1.02|
//|       102|              0.96|2013-03-01|                    0.9792|
//+     +         +     +             +

相关问题 更多 >