pySpark计数ID处于状态

2024-04-19 23:49:12 发布

您现在位置:Python中文网/ 问答频道 /正文

我有以下数据集,并使用PySpark

df = sparkSession.createDataFrame([(5, 'Samsung', '2018-02-23'),
                                   (8, 'Apple', '2018-02-22'),
                                   (5, 'Sony', '2018-02-21'),
                                   (5, 'Samsung', '2018-02-20'),
                                   (8, 'LG', '2018-02-20')],
                                   ['ID', 'Product', 'Date']
                                  )

+---+-------+----------+
| ID|Product|      Date|
+---+-------+----------+
|  5|Samsung|2018-02-23|
|  8|  Apple|2018-02-22|
|  5|   Sony|2018-02-21|
|  5|Samsung|2018-02-20|
|  8|     LG|2018-02-20|
+---+-------+----------+
# Each ID will appear ALWAYS at least 2 times (do not consider the case of unique IDs in this df)

每个ID只应在表示较高频率时递增产品计数器。 在相同频率的情况下,最近的日期应决定哪个产品收到+1。在

从上面的示例中,所需的输出将是:

^{pr2}$

PySpark实现这个结果的最有效方法是什么?在


Tags: 数据idappledfdate产品productwill
1条回答
网友
1楼 · 发布于 2024-04-19 23:49:12

IIUC,您需要为每个ID选择最常用的产品,使用 最近的Date

因此,首先,我们可以使用以下方法获得每个产品/ID对的计数:

import pyspark.sql.functions as f
from pyspark.sql import Window

df = df.select(
    'ID',
    'Product',
    'Date', 
    f.count('Product').over(Window.partitionBy('ID', 'Product')).alias('count')
)
df.show()
#+ -+   -+     +  -+
#| ID|Product|      Date|count|
#+ -+   -+     +  -+
#|  5|   Sony|2018-02-21|    1|
#|  8|     LG|2018-02-20|    1|
#|  8|  Apple|2018-02-22|    1|
#|  5|Samsung|2018-02-23|    2|
#|  5|Samsung|2018-02-20|    2|
#+ -+   -+     +  -+

现在您可以使用Window为每个ID对每个产品进行排序。我们可以使用pyspark.sql.functions.desc()count和{}降序排序。如果row_number()等于1,则表示该行是第一行。在

^{pr2}$

最后groupBy()生成并为Counter选择最大值的值:

df.groupBy('Product').agg(f.max('Counter').alias('Counter')).show()
#+   -+   -+
#|Product|Counter|
#+   -+   -+
#|   Sony|      0|
#|Samsung|      1|
#|     LG|      0|
#|  Apple|      1|
#+   -+   -+

更新

这里有一个简单的方法:

w = Window.partitionBy('ID').orderBy(f.desc('count'), f.desc('Date'))
df.groupBy('ID', 'Product')\
    .agg(f.max('Date').alias('Date'), f.count('Product').alias('Count'))\
    .select('Product', (f.row_number().over(w) == 1).cast("int").alias('Counter'))\
    .show()
#+   -+   -+
#|Product|Counter|
#+   -+   -+
#|Samsung|      1|
#|   Sony|      0|
#|  Apple|      1|
#|     LG|      0|
#+   -+   -+

相关问题 更多 >