如何在PySpark中按两列分组并计算每组的平均总值

1 投票
2 回答
38 浏览
提问于 2025-04-12 16:26

我有一个数据表(DataFrame),现在想用Pyspark来得到以下几个结果:

  1. 每个起点的总费用
  2. 每个起点的小费总额
  3. 每个起点的平均等待时间
  4. 每个终点的平均等待时间
起点 终点 费用 小费 等待时间
1 1 4.00 4.00 1.00
1 2 5.00 10.00 8.00
1 2 5.00 15.00 12.00
3 2 11.00 12.00 17.00
3 5 41.00 25.00 13.00
4 6 50.00 70.00 2.00

我现在的查询代码是这样的:

from pyspark.sql import functions as func
from pyspark.sql.functions import desc

data = [
    (1, 1, 4.00, 4.00, 1.00),
    (1, 2, 5.00, 10.00, 8.00),
    (1, 2, 5.00, 15.00, 12.00),
    (3, 2, 11.00, 12.00, 17.00),
    (3, 5, 41.00, 25.00, 13.00),
    (4, 6, 50.00, 70.00, 2.00)
]

columns = ["Pick", "Drop", "Fare", "Tip", "Drag"]
df = spark.createDataFrame(data, columns)


df.groupBy('Pick', 'Drop') \
    .agg(
        func.sum('Fare').alias('FarePick'),
        func.sum('Tip').alias('TipPick'),
        func.avg('Drag').alias('AvgDragPick'),
        func.avg('Drag').alias('AvgDragDrop')) \
    .orderBy('Pick').show()

不过,我觉得这可能不太对。我在处理第(4)项时有点卡住,因为分组的方式似乎不太正确。有没有人能给我一些建议来修正这个问题?我希望输出结果能放在一个表格里,格式如下:

起点 终点 起点费用总额 起点小费总额 起点平均等待时间 终点平均等待时间

2 个回答

0

我把你的表格数据放进了一个叫 data 的变量里,并把这四个步骤分开了。

from pyspark.sql import SparkSession
from pyspark.sql import functions as func

spark = SparkSession.builder \
    .appName("testSession") \
    .getOrCreate()

data = [
    (1, 1, 4.00, 4.00, 1.00),
    (1, 2, 5.00, 10.00, 8.00),
    (1, 2, 5.00, 15.00, 12.00),
    (3, 2, 11.00, 12.00, 17.00),
    (3, 5, 41.00, 25.00, 13.00),
    (4, 6, 50.00, 70.00, 2.00)
]

columns = ["Pick", "Drop", "Fare", "Tip", "Drag"]
df = spark.createDataFrame(data, columns)

# 1 and 2 and 3
df.groupBy('Pick').agg(
    func.sum('Fare').alias('TotalFarePick'),
    func.sum('Tip').alias('TotalTipPick'),
    func.avg('Drag').alias('AvgDragPick')
).orderBy('Pick').show()

# 4
df.groupBy('Drop').agg(
    func.avg('Drag').alias('AvgDragDrop')
).orderBy('Drop').show()

spark.stop()

这两个表格的输出结果:

+----+-------------+------------+-----------+
|Pick|TotalFarePick|TotalTipPick|AvgDragPick|
+----+-------------+------------+-----------+
|   1|         14.0|        29.0|        7.0|
|   3|         52.0|        37.0|       15.0|
|   4|         50.0|        70.0|        2.0|
+----+-------------+------------+-----------+

+----+------------------+
|Drop|       AvgDragDrop|
+----+------------------+
|   1|               1.0|
|   2|12.333333333333334|
|   5|              13.0|
|   6|               2.0|
+----+------------------+
0

如果你想在结果中包含所有列,可以使用窗口函数。

from pyspark.sql import functions as f
from pyspark.sql import Window

data = [
    (1, 1, 4.00, 4.00, 1.00),
    (1, 2, 5.00, 10.00, 8.00),
    (1, 2, 5.00, 15.00, 12.00),
    (3, 2, 11.00, 12.00, 17.00),
    (3, 5, 41.00, 25.00, 13.00),
    (4, 6, 50.00, 70.00, 2.00)
]

columns = ["Pick", "Drop", "Fare", "Tip", "Drag"]
df = spark.createDataFrame(data, columns)

df_new = (
    df
    .withColumn("TotalFarePick", f.sum("Fare").over(Window.partitionBy("Pick")))
    .withColumn("TotalTipPick", f.sum("Tip").over(Window.partitionBy("Pick")))
    .withColumn("AvgDragPick", f.avg("Drag").over(Window.partitionBy("Pick")))
    .withColumn("AvgDragDrop", f.avg("Drag").over(Window.partitionBy("Drop")))
    .drop("Fare", "Tip", "Drag")
)

df_new.show()

另外,请不要使用 \,因为在新的 Python 版本中不推荐使用它。

你可以查看这个链接了解更多信息:https://peps.python.org/pep-0008/ :

推荐的处理长行的方法是使用 Python 的隐式换行,也就是在括号、方括号和大括号内换行。长行可以通过在括号内包裹表达式来分成多行。相比使用反斜杠来换行,这种方法更受欢迎。

撰写回答