如何使用Python在Spark中执行KeyValueRDD和Key是元组的连接?

2024-06-16 10:47:04 发布

您现在位置:Python中文网/ 问答频道 /正文

如何使用python在apachespark中执行joins KeyValueRDD?你知道吗

这是我的两个RDD

rddUser:[((u'M', '[68-73]', u'B'), u'TwoFace'), ((u'F', '[33-38]', u'Fr'), u'Catwoman'), ((u'Female', '[23-28]', u'L'), u'HarleyQuinn'), ((u'M', '[75+]', u'L'), u'Joker'), ((u'F', '[28-33]', u'Belgium'), u'PoisonIvy')]
rdd:[((u'F', '[23-28]', u'L'), 180.0), ((u'F', '[28-33]', u'B'), 60.0), ((u'F', '[33-38]', u'Fr'), 56.0), ((u'M', '[68-73]', u'B'), 136.0), ((u'M', '[75+]', u'L'), 98.0)]

我试过这个:

print rddUser.join(rdd).collect()

但这条线的火花塞

预期结果(或类似结果):

((u'M', '[68-73]', u'B'), u'TwoFace', 136.0)

我怎么做?你知道吗

编辑:

It works fine in pyspark, but when i use it in my script, the script wait at the line. After 30 min, this log show up :

17/04/27 12:25:22 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 192.168.2.76:40028 in memory (size: 11.8 KB, free: 366.3 MB)
17/04/27 12:25:22 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 192.168.2.76:40028 in memory (size: 5.8 KB, free: 366.3 MB)
17/04/27 12:25:22 INFO ContextCleaner: Cleaned accumulator 135
17/04/27 12:25:22 INFO BlockManagerInfo: Removed broadcast_4_piece0 on 192.168.2.76:40028 in memory (size: 10.7 KB, free: 366.3 MB)
17/04/27 12:25:22 INFO BlockManagerInfo: Removed broadcast_5_piece0 on 192.168.2.76:40028 in memory (size: 399.0 B, free: 366.3 MB)
17/04/27 12:25:22 INFO BlockManagerInfo: Removed broadcast_6_piece0 on 192.168.2.76:40028 in memory (size: 9.5 KB, free: 366.3 MB)
17/04/27 12:25:22 INFO BlockManagerInfo: Removed broadcast_7_piece0 on 192.168.2.76:40028 in memory (size: 5.0 KB, free: 366.3 MB)

After 1h30, nothing append


Tags: ininfofreesizekbonmbfr
1条回答
网友
1楼 · 发布于 2024-06-16 10:47:04

Join在pyspark中工作得很好,但是,使用当前数据,它将创建格式为(k,(v1,v2))而不是预期的(k,v1,v2)的Join结果。你可以做一个map来改变它。你知道吗

下面对我来说很好-

rddUser = sc.parallelize([((u'M', '[68-73]', u'B'), u'TwoFace'), 
                          ((u'F', '[33-38]', u'Fr'), u'Catwoman'), 
                          ((u'Female', '[23-28]', u'L'), u'HarleyQuinn'), 
                          ((u'M', '[75+]', u'L'), u'Joker'), 
                          ((u'F', '[28-33]', u'Belgium'), u'PoisonIvy')])

rdd =sc.parallelize([((u'F', '[23-28]', u'L'), 180.0), 
                     ((u'F', '[28-33]', u'B'), 60.0), 
                     ((u'F', '[33-38]', u'Fr'), 56.0), 
                     ((u'M', '[68-73]', u'B'), 136.0), 
                     ((u'M', '[75+]', u'L'), 98.0)])

rddUser.join(rdd).collect()

输出:

[(('M', '[68-73]', 'B'), ('TwoFace', 136.0)),
 (('F', '[33-38]', 'Fr'), ('Catwoman', 56.0)),
 (('M', '[75+]', 'L'), ('Joker', 98.0))]

相关问题 更多 >