如何在PySpark中按两列分组并计算每组的平均总值
我有一个数据表(DataFrame),现在想用Pyspark来得到以下几个结果:
- 每个起点的总费用
- 每个起点的小费总额
- 每个起点的平均等待时间
- 每个终点的平均等待时间
起点 | 终点 | 费用 | 小费 | 等待时间 |
---|---|---|---|---|
1 | 1 | 4.00 | 4.00 | 1.00 |
1 | 2 | 5.00 | 10.00 | 8.00 |
1 | 2 | 5.00 | 15.00 | 12.00 |
3 | 2 | 11.00 | 12.00 | 17.00 |
3 | 5 | 41.00 | 25.00 | 13.00 |
4 | 6 | 50.00 | 70.00 | 2.00 |
我现在的查询代码是这样的:
from pyspark.sql import functions as func
from pyspark.sql.functions import desc
data = [
(1, 1, 4.00, 4.00, 1.00),
(1, 2, 5.00, 10.00, 8.00),
(1, 2, 5.00, 15.00, 12.00),
(3, 2, 11.00, 12.00, 17.00),
(3, 5, 41.00, 25.00, 13.00),
(4, 6, 50.00, 70.00, 2.00)
]
columns = ["Pick", "Drop", "Fare", "Tip", "Drag"]
df = spark.createDataFrame(data, columns)
df.groupBy('Pick', 'Drop') \
.agg(
func.sum('Fare').alias('FarePick'),
func.sum('Tip').alias('TipPick'),
func.avg('Drag').alias('AvgDragPick'),
func.avg('Drag').alias('AvgDragDrop')) \
.orderBy('Pick').show()
不过,我觉得这可能不太对。我在处理第(4)项时有点卡住,因为分组的方式似乎不太正确。有没有人能给我一些建议来修正这个问题?我希望输出结果能放在一个表格里,格式如下:
起点 | 终点 | 起点费用总额 | 起点小费总额 | 起点平均等待时间 | 终点平均等待时间 |
---|---|---|---|---|---|
2 个回答
0
我把你的表格数据放进了一个叫 data
的变量里,并把这四个步骤分开了。
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
spark = SparkSession.builder \
.appName("testSession") \
.getOrCreate()
data = [
(1, 1, 4.00, 4.00, 1.00),
(1, 2, 5.00, 10.00, 8.00),
(1, 2, 5.00, 15.00, 12.00),
(3, 2, 11.00, 12.00, 17.00),
(3, 5, 41.00, 25.00, 13.00),
(4, 6, 50.00, 70.00, 2.00)
]
columns = ["Pick", "Drop", "Fare", "Tip", "Drag"]
df = spark.createDataFrame(data, columns)
# 1 and 2 and 3
df.groupBy('Pick').agg(
func.sum('Fare').alias('TotalFarePick'),
func.sum('Tip').alias('TotalTipPick'),
func.avg('Drag').alias('AvgDragPick')
).orderBy('Pick').show()
# 4
df.groupBy('Drop').agg(
func.avg('Drag').alias('AvgDragDrop')
).orderBy('Drop').show()
spark.stop()
这两个表格的输出结果:
+----+-------------+------------+-----------+
|Pick|TotalFarePick|TotalTipPick|AvgDragPick|
+----+-------------+------------+-----------+
| 1| 14.0| 29.0| 7.0|
| 3| 52.0| 37.0| 15.0|
| 4| 50.0| 70.0| 2.0|
+----+-------------+------------+-----------+
+----+------------------+
|Drop| AvgDragDrop|
+----+------------------+
| 1| 1.0|
| 2|12.333333333333334|
| 5| 13.0|
| 6| 2.0|
+----+------------------+
0
如果你想在结果中包含所有列,可以使用窗口函数。
from pyspark.sql import functions as f
from pyspark.sql import Window
data = [
(1, 1, 4.00, 4.00, 1.00),
(1, 2, 5.00, 10.00, 8.00),
(1, 2, 5.00, 15.00, 12.00),
(3, 2, 11.00, 12.00, 17.00),
(3, 5, 41.00, 25.00, 13.00),
(4, 6, 50.00, 70.00, 2.00)
]
columns = ["Pick", "Drop", "Fare", "Tip", "Drag"]
df = spark.createDataFrame(data, columns)
df_new = (
df
.withColumn("TotalFarePick", f.sum("Fare").over(Window.partitionBy("Pick")))
.withColumn("TotalTipPick", f.sum("Tip").over(Window.partitionBy("Pick")))
.withColumn("AvgDragPick", f.avg("Drag").over(Window.partitionBy("Pick")))
.withColumn("AvgDragDrop", f.avg("Drag").over(Window.partitionBy("Drop")))
.drop("Fare", "Tip", "Drag")
)
df_new.show()
另外,请不要使用 \
,因为在新的 Python 版本中不推荐使用它。
你可以查看这个链接了解更多信息:https://peps.python.org/pep-0008/ :
推荐的处理长行的方法是使用 Python 的隐式换行,也就是在括号、方括号和大括号内换行。长行可以通过在括号内包裹表达式来分成多行。相比使用反斜杠来换行,这种方法更受欢迎。