试图基于movieId列组合两个rdd,但第二列的值来自错误的

2024-05-23 14:59:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试根据movieId中的值将两个具有不同分区数的rdd组合在一起。我发现的这个自定义函数很有用,只是第二个值来自错误的行

数据集如下所示:

movies.csv looks like

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action

and ratings.csv looks like
userId,movieId,rating,timestamp
1,2,3.5,1112486027
1,29,3.5,1112484676
1,32,3.5,1112484819
1,47,3.5,1112484727
1,50,3.5,1112484580
1,112,3.5,1094785740
1,151,4.0,1094785734
1,223,4.0,1112485573
1,253,4.0,1112484940
def custom_zip(rdd1, rdd2):
    index = itemgetter(1)
    def prepare(rdd, npart):
        return (rdd.zipWithIndex()
                   .sortByKey(index, numPartitions=npart)
                   .keys())

    npart = rdd1.getNumPartitions() + rdd2.getNumPartitions() 

    return prepare(rdd1, npart).zip(prepare(rdd2, npart)) 

ratings_data = sc.textFile("/home/hadoop/assignment4/ml-20m/ratings.csv").map(lambda line: line.split(",")[:len(line.split(","))-1])
movies_data = sc.textFile("/home/hadoop/assignment4/ml-20m/movies.csv").map(lambda line: line.split(","))

ratings_header = ratings_data.first()

movies_header = movies_data.first()

ratings_data = ratings_data.filter(lambda line: line != ratings_header)

movies_data = movies_data.filter(lambda line: line != movies_header)

data = custom_zip(ratings_data, movies_data).map(lambda x: [item for sublist in x for item in sublist]).map(lambda x: x[:3]+x[-2:])

print(data.take(1))
#[u'1', u'1009', u'3.5', u'1', u'Toy Story (1995)', u'Adventure|Animation|Children|Comedy|Fantasy']

值1009来自RDD中的第21行,应该是其他值。有人能帮我找出我做的事有什么不对吗?我只是想把两个不同的RDD组合在一起


Tags: csvlambdamapdatalinemoviesfantasyheader