Pyspark SQL查询以获取特定列+/20%的行

2024-04-26 04:39:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我有以下资料:

+------------------+--------+-------+
|                ID|  Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944|  32850|
|201542399349300629| 3979760| 850914|
|201542399349300634| 3402687|1983568|
|201542399349300724| 1138291|1097553|
|201522369349300122| 1401406|1010828|
|201522369349300137|   16948| 171534|
|201522369349300142|13474056|2285323|
|201522369349300202|  481045| 241788|
|201522369349300207|  700861|1185640|
|201522369349300227|  178479| 267976|
+------------------+--------+-------+

对于每一行,我希望能够得到资产金额20%以内的行。例如,对于第一行(ID=201542399349300619),我希望能够获得资产在1633944的20%+/-范围内(因此在1307155到1960732之间)的所有行:

+------------------+--------+-------+
|                ID|  Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944|  32850|
|201522369349300122| 1401406|1010828|

使用这个子集表,我想得到平均资产并将其添加为一个新列。因此,对于上述示例,它将是(1633944+1401406)=1517675的平均资产

+------------------+--------+-------+---------+
|                ID|  Assets|Revenue|AvgAssets|
+------------------+--------+-------+---------+
|201542399349300619| 1633944|  32850|  1517675|

Tags: id示例资产金额子集资料assetsrevenue
1条回答
网友
1楼 · 发布于 2024-04-26 04:39:48

假设您的数据帧具有与以下类似的模式(即AssetsRevenue是数字):

df.printSchema()
#root
# |  ID: long (nullable = true)
# |  Assets: integer (nullable = true)
# |  Revenue: integer (nullable = true)

您可以join在您所设定的条件下将数据帧转换为自身。连接之后,可以通过取Assets列的平均值来进行分组和聚合。你知道吗

例如:

from pyspark.sql.functions import avg, expr

df.alias("l")\
    .join(
        df.alias("r"), 
        on=expr("r.assets between l.assets*0.8 and l.assets*1.2")
    )\
    .groupBy("l.ID", "l.Assets", "l.Revenue")\
    .agg(avg("r.Assets").alias("AvgAssets"))\
    .show()
#+         +    +   -+         +
#|                ID|  Assets|Revenue|         AvgAssets|
#+         +    +   -+         +
#|201542399349300629| 3979760| 850914|         3691223.5|
#|201522369349300202|  481045| 241788|          481045.0|
#|201522369349300207|  700861|1185640|          700861.0|
#|201522369349300137|   16948| 171534|           16948.0|
#|201522369349300142|13474056|2285323|       1.3474056E7|
#|201522369349300227|  178479| 267976|          178479.0|
#|201542399349300619| 1633944|  32850|         1517675.0|
#|201522369349300122| 1401406|1010828|1391213.6666666667|
#|201542399349300724| 1138291|1097553|         1138291.0|
#|201542399349300634| 3402687|1983568|         3691223.5|
#+         +    +   -+         +

由于我们将数据帧连接到自身,因此可以使用别名来引用左表("l")和右表("r")。上面的逻辑是在r中的资产是l中资产的+/20%的条件下,将l连接到r。你知道吗

有多种方法可以表示+/20%条件,但我使用sparksqlbetween表达式来查找介于Assets * 0.8Assets * 1.2之间的行。你知道吗

然后我们对左表的所有列(groupBy)进行聚合,并对右表中的资产进行平均。你知道吗

生成的AvgAssets列是FloatType列,但如果您喜欢,可以通过在.alias("AvgAssets")之前添加.cast("int")将其轻松转换为IntegerType。你知道吗


另请参见:

相关问题 更多 >