我有一个数据帧,其中包含的行表示用户对特定电影的分级实例。每个电影可以由多个用户在多个类别中进行分级。这是我用电影镜头数据创建的结果数据帧。你知道吗
|movie_id|year|categories|
+--------+----+----------+
| 122|1990| Comedy|
| 122|1990| Romance|
| 185|1990| Action|
| 185|1990| Crime|
| 185|1990| Thriller|
| 231|1990| Comedy|
| 292|1990| Action|
| 292|1990| Drama|
| 292|1990| Sci-Fi|
| 292|1990| Thriller|
| 316|1990| Action|
| 316|1990| Adventure|
| 316|1990| Sci-Fi|
| 329|1990| Action|
| 329|1990| Adventure|
| 329|1990| Drama|
.
.
.
电影id是电影的唯一id,年份是用户对电影进行评级的年份,类别是电影的12个类别之一。部分文件here
我想在每个类别中找到每十年中收视率最高的电影(计算每个类别中每十年中每部电影的频率)
像这样的
+-----------------------------------+
| year | category | movie_id | rank |
+-----------------------------------+
| 1990 | Comedy | 1273 | 1 |
| 1990 | Comedy | 6547 | 2 |
| 1990 | Comedy | 8973 | 3 |
.
.
| 1990 | Comedy | 7483 | 10 |
.
.
| 1990 | Drama | 1273 | 1 |
| 1990 | Drama | 6547 | 2 |
| 1990 | Drama | 8973 | 3 |
.
.
| 1990 | Comedy | 7483 | 10 |
.
.
| 2000 | Comedy | 1273 | 1 |
| 2000 | Comedy | 6547 | 2 |
.
.
for every decade, top 10 movies in each category
我知道需要使用pyspark窗口函数。这就是我试过的
windowSpec = Window.partitionBy(res_agg['year']).orderBy(res_agg['categories'].desc())
final = res_agg.select(res_agg['year'], res_agg['movie_id'], res_agg['categories']).withColumn('rank', func.rank().over(windowSpec))
但它会返回如下结果:
+----+--------+------------------+----+
|year|movie_id| categories|rank|
+----+--------+------------------+----+
|2000| 8606|(no genres listed)| 1|
|2000| 1587| Action| 1|
|2000| 1518| Action| 1|
|2000| 2582| Action| 1|
|2000| 5460| Action| 1|
|2000| 27611| Action| 1|
|2000| 48304| Action| 1|
|2000| 54995| Action| 1|
|2000| 4629| Action| 1|
|2000| 26606| Action| 1|
|2000| 56775| Action| 1|
|2000| 62008| Action| 1|
我对pyspark还很陌生,所以被困在这里。谁能告诉我我做错了什么。你知道吗
你是对的,你需要使用一个窗口,但是首先,你需要执行第一次聚合来计算频率。你知道吗
首先,让我们计算十年。你知道吗
然后我们按十年、类别和电影id计算频率:
最后,我们定义一个按年代和类别划分的窗口,并使用秩函数选择前10名:
相关问题 更多 >
编程相关推荐