使用窗口函数的pyspark

2024-06-16 08:25:18 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个数据帧,其中包含的行表示用户对特定电影的分级实例。每个电影可以由多个用户在多个类别中进行分级。这是我用电影镜头数据创建的结果数据帧。你知道吗

|movie_id|year|categories|
+--------+----+----------+
|     122|1990|    Comedy|
|     122|1990|   Romance|
|     185|1990|    Action|
|     185|1990|     Crime|
|     185|1990|  Thriller|
|     231|1990|    Comedy|
|     292|1990|    Action|
|     292|1990|     Drama|
|     292|1990|    Sci-Fi|
|     292|1990|  Thriller|
|     316|1990|    Action|
|     316|1990| Adventure|
|     316|1990|    Sci-Fi|
|     329|1990|    Action|
|     329|1990| Adventure|
|     329|1990|     Drama|
.
.
.

电影id是电影的唯一id,年份是用户对电影进行评级的年份,类别是电影的12个类别之一。部分文件here

我想在每个类别中找到每十年中收视率最高的电影(计算每个类别中每十年中每部电影的频率)

像这样的

+-----------------------------------+
| year | category | movie_id | rank |
+-----------------------------------+
| 1990 | Comedy   | 1273     | 1    |
| 1990 | Comedy   | 6547     | 2    |
| 1990 | Comedy   | 8973     | 3    |
.
.
| 1990 | Comedy   | 7483     | 10   |
.
.
| 1990 | Drama    | 1273     | 1    |
| 1990 | Drama    | 6547     | 2    |
| 1990 | Drama    | 8973     | 3    |
.
.
| 1990 | Comedy   | 7483     | 10   |  
.
.
| 2000 | Comedy   | 1273     | 1    |
| 2000 | Comedy   | 6547     | 2    |
.
.

for every decade, top 10 movies in each category 

我知道需要使用pyspark窗口函数。这就是我试过的

windowSpec = Window.partitionBy(res_agg['year']).orderBy(res_agg['categories'].desc())

final = res_agg.select(res_agg['year'], res_agg['movie_id'], res_agg['categories']).withColumn('rank', func.rank().over(windowSpec))

但它会返回如下结果:

+----+--------+------------------+----+
|year|movie_id|        categories|rank|
+----+--------+------------------+----+
|2000|    8606|(no genres listed)|   1|
|2000|    1587|            Action|   1|
|2000|    1518|            Action|   1|
|2000|    2582|            Action|   1|
|2000|    5460|            Action|   1|
|2000|   27611|            Action|   1|
|2000|   48304|            Action|   1|
|2000|   54995|            Action|   1|
|2000|    4629|            Action|   1|
|2000|   26606|            Action|   1|
|2000|   56775|            Action|   1|
|2000|   62008|            Action|   1|

我对pyspark还很陌生,所以被困在这里。谁能告诉我我做错了什么。你知道吗


Tags: 数据用户id电影resactionmovie类别
1条回答
网友
1楼 · 发布于 2024-06-16 08:25:18

你是对的,你需要使用一个窗口,但是首先,你需要执行第一次聚合来计算频率。你知道吗

首先,让我们计算十年。你知道吗

df_decade = df.withColumn("decade", concat(substring(col("year"), 0, 3), lit("0")))

然后我们按十年、类别和电影id计算频率:

agg_df = df_decade\
      .groupBy("decade", "category", "movie_id")\
      .agg(count(col("*")).alias("freq"))

最后,我们定义一个按年代和类别划分的窗口,并使用秩函数选择前10名:

w = Window.partitionBy("decade", "category").orderBy(desc("freq"))
top10 = agg_df.withColumn("r", rank().over(w)).where(col("r") <= 10)

相关问题 更多 >