PySpark使用共享相同值的两个密钥创建pair RDD

2024-06-17 15:09:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个键-值对RDD,其中键是一个演员,值是这个演员参与的电影,形式是:

["actor 1", "movie 1"]
["actor 1", "movie 2"]
["actor 1", "movie 3"]
...
["actor n", "movie 2"] 

我想把它映射到另一个键值对RDD中,每一对都由两个演员组成,他们参与一个共同的电影。在

在上面的例子中,这意味着新的RDD将包含一对["actor 1", "actor n"],因为它们都参与{}。在


Tags: 电影movie形式例子键值actorrdd演员
2条回答

这不完全是你所要求的,但我认为已经足够好了:

import itertools as iter

movies = sc.parallelize([("P", "SW4"), ("P", "SW5"), ("P", "SW6"),
                         ("A", "SW4"), ("A", "SW5"),
                         ("B", "SW5"), ("B", "SW6"),
                         ("W", "SW4"),
                         ("X", "SW1"), ("X", "SW7"), ("X", "SW2"), ("X", "SW3"),
                         ("Y", "SW1"), ("Y", "SW7"), ("Y", "SW2"), ("Y", "SW3")])

swap_tuple = lambda (k, v): (v, k)

movies = movies.groupByKey().mapValues(list)

all_pairs = movies.flatMap(lambda (movie, actors): map(lambda actors:(movie, actors), iter.combinations(actors, 2)))

print all_pairs.collect()

""" 
    >> [('SW1', ('X', 'Y')),
        ('SW3', ('X', 'Y')),
        ('SW5', ('P', 'A')),
        ('SW5', ('P', 'B')),
        ('SW5', ('A', 'B')),
        ('SW7', ('X', 'Y')),
        ('SW2', ('X', 'Y')),
        ('SW4', ('P', 'A')),
        ('SW4', ('P', 'W')),
        ('SW4', ('A', 'W')),
        ('SW6', ('P', 'B'))]
"""

Here是使用.ipynb的运行

一个简单的交换和连接就可以做到这一点。首先,让我们创建一些虚拟数据和一个小的helper函数:

actor_movie = sc.parallelize([
    ("actor 1", "movie 1"),
    ("actor 1", "movie 3"),
    ("actor 1", "movie 3"),
    ("actor n", "movie 2")
])

swap = lambda x: (x[1], x[0])

接下来交换订单:

^{pr2}$

加入:

(movie_actor
    .join(movie_actor) # Join by movie
    .values() # Extract values (actors)
    .filter(lambda x: x[0] != x[1]))

相关问题 更多 >