Spark:在RDD map()中使用迭代器lambda函数
我在HDFS上有一个简单的数据集,准备把它加载到Spark中。数据集的样子是这样的:
1 1 1 1 1 1 1
1 1 1 1 1 1 1
1 1 1 1 1 1 1
...
基本上就是一个矩阵。我想实现一个功能,需要对矩阵的行进行分组,所以我想给每一行添加一个唯一的键,像这样:
(1, [1 1 1 1 1 ... ])
(2, [1 1 1 1 1 ... ])
(3, [1 1 1 1 1 ... ])
...
我尝试了一种比较简单的方法:设置一个全局变量,然后写一个lambda函数来遍历这个全局变量:
# initialize global index
global global_index
global_index = 0
# function to generate keys
def generateKeys(x):
global_index+=1
return (global_index,x)
# read in data and operate on it
data = sc.textFile("/data.txt")
...some preprocessing...
data.map(generateKeys)
但是好像没有识别到这个全局变量的存在。
有没有什么简单的方法可以做到这一点呢?
谢谢,
Jack
2 个回答
试试用 dataRdd.zipWithIndex
这个方法,最后如果你一定要把索引放在前面,可以把得到的结果进行交换。
>>> assignId("lonely line")
(15, 'lonely line')
enumerate
可以为可迭代的每个项目生成一个唯一的索引,并返回一个包含 (索引, 原始项目)
的元组。
如果你想从 0
以外的数字开始编号,可以把起始值作为第二个参数传给 enumerate
。
>>> lsts = [
... [1, 1, 1, 1, 1, 1],
... [1, 1, 1, 1, 1, 1],
... [1, 1, 1, 1, 1, 1],
... [1, 1, 1, 1, 1, 1],
... [1, 1, 1, 1, 1, 1],
... [1, 1, 1, 1, 1, 1],
... [1, 1, 1, 1, 1, 2],
... [1, 1, 1, 2, 1, 2]
... ]
...
>>> list(enumerate(lsts))
[(0, [1, 1, 1, 1, 1, 1]),
(1, [1, 1, 1, 1, 1, 1]),
(2, [1, 1, 1, 1, 1, 1]),
(3, [1, 1, 1, 1, 1, 1]),
(4, [1, 1, 1, 1, 1, 1]),
(5, [1, 1, 1, 1, 1, 1]),
(6, [1, 1, 1, 1, 1, 2]),
(7, [1, 1, 1, 2, 1, 2])]
需要注意的是,list
是用来从 enumerate
中获取真实值的,因为 enumerate
是一个迭代器,而不是返回列表的函数。
替代方案:全局可用的 ID 生成器
enumerate
使用起来很简单,但如果你需要在代码的不同部分分配 ID,这就会变得困难或者不可能。在这种情况下,像 OP 中提到的全局可用生成器就是一个不错的选择。
itertools
提供了 count
,可以满足我们的需求:
>>> list(enumerate(lsts, 1))
[(1, [1, 1, 1, 1, 1, 1]),
(2, [1, 1, 1, 1, 1, 1]),
(3, [1, 1, 1, 1, 1, 1]),
(4, [1, 1, 1, 1, 1, 1]),
(5, [1, 1, 1, 1, 1, 1]),
(6, [1, 1, 1, 1, 1, 1]),
(7, [1, 1, 1, 1, 1, 2]),
(8, [1, 1, 1, 2, 1, 2])]
现在我们有了一个(全局可用的)idgen
生成器,可以用来生成唯一的 ID。
我们可以通过一个名为 prid
(打印 ID)的函数来测试它:
>>> from itertools import count
>>> idgen = count()
它工作正常,我们可以在一组值上进行测试:
>>> def prid():
... id = idgen.next()
... print id
...
>>> prid()
0
>>> prid()
1
>>> prid()
2
>>> prid()
3
并定义一个实际的函数,当调用它并传入一个值时,会返回一个元组 (id, value)
。
>>> lst = ['100', '101', '102', '103', '104', '105', '106', '107', '108', '109']
注意,不需要将 idgen
声明为全局变量,因为我们不会改变它的值(idgen
只会在调用时改变内部状态,但仍然是同一个生成器)。
测试一下它是否有效:
>>> def assignId(val):
... return (idgen.next(), val)
...
并在列表上尝试一下:
>>> assignId("ahahah")
(4, 'ahahah')
与 enumerate
的解决方案相比,主要的区别在于,我们可以在代码的任何地方逐个分配 ID,而不需要在一个处理 enumerate
的过程中完成所有操作。
>>> map(assignId, lst)
[(5, '100'),
(6, '101'),
(7, '102'),
(8, '103'),
(9, '104'),
(10, '105'),
(11, '106'),
(12, '107'),
(13, '108'),
(14, '109')]