根据列中特定值的计数条件筛选spark数据帧中的行[pyspark中的spark.sql语法]

2024-06-11 09:22:19 发布

您现在位置:Python中文网/ 问答频道 /正文

我有以下spark数据框:

datalake_spark_dataframe_downsampled = pd.DataFrame( 
                           {'id' : ['001', '001', '001', '001', '001', '002', '002', '002'],
                            'OuterSensorConnected':[0, 0, 0, 1, 0, 0, 0, 1], 
                            'OuterHumidity':[31.784826, 32.784826, 33.784826, 43.784826, 23.784826, 54.784826, 31.784826, 31.784826],
                            'EnergyConsumption': [70, 70, 70, 70, 70, 70, 70, 70],
                            'DaysDeploymentDate': [10, 20, 21, 31, 41, 11, 19, 57],
                            'label': [0, 0, 1, 1, 1, 0, 0, 1]}
                           )
datalake_spark_dataframe_downsampled = spark.createDataFrame(datalake_spark_dataframe_downsampled )

# printSchema of the datalake_spark_dataframe_downsampled (spark df):

"root
 |-- IMEI: string (nullable = true)
 |-- OuterSensorConnected: integer (nullable = false)
 |-- OuterHumidity: float (nullable = true)
 |-- EnergyConsumption: float (nullable = true)
 |-- DaysDeploymentDate: integer (nullable = true)
 |-- label: integer (nullable = false)"

如您所见,对于第一个id'001'我有5行,对于第二个id'002'我有3行。我想要的是过滤掉连接到ID的行,它们的正标签('1')总共少于2。因此,由于第一个id'001'的阳性标签数量等于3(总共三行,阳性标签为1),而第二个id'002'只有一行,阳性标签为1,因此我希望过滤掉与id'002'相关的所有行。因此,我的最终df将如下所示:

datalake_spark_dataframe_downsampled_filtered = pd.DataFrame( 
                           {'id' : ['001', '001', '001', '001', '001'],
                            'OuterSensorConnected':[0, 0, 0, 1], 
                            'OuterHumidity':[31.784826, 32.784826, 33.784826, 43.784826, 23.784826],
                            'EnergyConsumption': [70, 70, 70, 70, 70],
                            'DaysDeploymentDate': [10, 20, 21, 31, 41],
                            'label': [0, 0, 1, 1, 1]}
                           )
datalake_spark_dataframe_downsampled_filtered = spark.createDataFrame(datalake_spark_dataframe_downsampled_filtered)

如何通过spark.sql()查询实现这一点

datalake_spark_dataframe_downsampled_filtered.createOrReplaceTempView("df_filtered")

spark_dataset_filtered=spark.sql("""SELECT *, count(label) as counted_label FROM df_filtered GROUP BY id HAVING counted_label >=2""") #how to only count the positive values here?

Tags: idtruedataframedf标签labelfilteredspark
1条回答
网友
1楼 · 发布于 2024-06-11 09:22:19

使用窗口如何:

datalake_spark_dataframe_downsampled.createOrReplaceTempView("df_filtered")

spark.sql("""select * from (select *, sum(label) over (partition by id) as Sum_l
                      from df_filtered) where Sum_l >= 2""").drop("Sum_l").show()

+ -+          +      -+        -+         +  -+
| id|OuterSensorConnected|OuterHumidity|EnergyConsumption|DaysDeploymentDate|label|
+ -+          +      -+        -+         +  -+
|001|                   0|    31.784826|               70|                10|    0|
|001|                   0|    32.784826|               70|                20|    0|
|001|                   0|    33.784826|               70|                21|    1|
|001|                   1|    43.784826|               70|                31|    1|
|001|                   0|    23.784826|               70|                41|    1|
+ -+          +      -+        -+         +  -+

相关问题 更多 >