如何在Pyspark中使用groupby删除条件中的列

2024-05-15 08:26:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个名为df的数据帧,它包含以下内容:

accountname |   clustername |   namespace   |   cost
account1    |   cluster_1_1 |   ns_1_1      |   10
account1    |   cluster_1_1 |   ns_1_2      |   11
account1    |   cluster_1_1 |   infra       |   12
account1    |   cluster_1_2 |   infra       |   12
account2    |   cluster_2_1 |   infra       |   13
account3    |   cluster_3_1 |   ns_3_1      |   10
account3    |   cluster_3_1 |   ns_3_2      |   11
account3    |   cluster_3_1 |   infra       |   12

df位于groupby accountname字段中,我需要在每个accountname中按clustername字段进行筛选,以执行以下操作: 当clustername中的行对每个accountname有多个条目时,请删除namespace=infra的行,如果clustername中的行在其accountname中只有一行,请保留以下内容:

accountname |   clustername |   namespace   |   cost
account1    |   cluster_1_1 |   ns_1_1      |   10
account1    |   cluster_1_1 |   ns_1_2      |   11
account1    |   cluster_1_2 |   infra       |   12
account2    |   cluster_2_1 |   infra       |   13
account3    |   cluster_3_1 |   ns_3_1      |   10
account3    |   cluster_3_1 |   ns_3_2      |   11

由于集群_1_1有多行,并且名称空间中的值为“infra”,因此该行被删除。 但是对于cluster_1_2和cluster_2_1,因为它们只有一行,所以它被保留。 我的代码是这样的:

from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

fields = Row('accountname','clustername','namespace','cost')
s1 = fields("account1","cluster_1_1","ns_1_1",10)
s2 = fields("account1","cluster_1_1","ns_1_2",11)
s3 = fields("account1","cluster_1_1","infra",12)
s4 = fields("account1","cluster_1_2","infra",12)
s5 = fields("account2","cluster_2_1","infra",13)
s6 = fields("account3","cluster_3_1","ns_3_1",10)
s7 = fields("account3","cluster_3_1","ns_3_2",11)
s8 = fields("account3","cluster_3_1","infra",12)

fieldsData=[s1,s2,s3,s4,s5,s6,s7,s8]
df=spark.createDataFrame(fieldsData)
df.show()

提前谢谢


Tags: fromfieldsdfsqlnamespacepysparkclusterns
1条回答
网友
1楼 · 发布于 2024-05-15 08:26:41

检查一下,您可以首先使用按accountname&;分区的窗口函数计算clustername的计数;clustername,然后对count大于1且namespace=infra的行使用筛选器的否定

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w= Window.partitionBy("accountname", "clustername")

df.withColumn("count", F.count("clustername").over(w))\
    .filter(~((F.col("count")>1)&(F.col("namespace")=='infra')))\
    .drop("count").orderBy(F.col("accountname")).show()

+     -+     -+    -+  +
|accountname|clustername|namespace|cost|
+     -+     -+    -+  +
|   account1|cluster_1_1|   ns_1_1|  10|
|   account1|cluster_1_1|   ns_1_2|  11|
|   account1|cluster_1_2|    infra|  12|
|   account2|cluster_2_1|    infra|  13|
|   account3|cluster_3_1|   ns_3_1|  10|
|   account3|cluster_3_1|   ns_3_2|  11|
+     -+     -+    -+  +

相关问题 更多 >