Spark:在RDD map()中使用迭代器lambda函数

1 投票
2 回答
3084 浏览
提问于 2025-04-18 12:53

我在HDFS上有一个简单的数据集,准备把它加载到Spark中。数据集的样子是这样的:

1 1 1 1 1 1 1
1 1 1 1 1 1 1
1 1 1 1 1 1 1
...

基本上就是一个矩阵。我想实现一个功能,需要对矩阵的行进行分组,所以我想给每一行添加一个唯一的键,像这样:

(1, [1 1 1 1 1 ... ])
(2, [1 1 1 1 1 ... ])
(3, [1 1 1 1 1 ... ])
...

我尝试了一种比较简单的方法:设置一个全局变量,然后写一个lambda函数来遍历这个全局变量:

# initialize global index
global global_index
global_index = 0

# function to generate keys
def generateKeys(x):
    global_index+=1
    return (global_index,x)

# read in data and operate on it
data = sc.textFile("/data.txt")

...some preprocessing...

data.map(generateKeys)

但是好像没有识别到这个全局变量的存在。

有没有什么简单的方法可以做到这一点呢?

谢谢,
Jack

2 个回答

0

试试用 dataRdd.zipWithIndex 这个方法,最后如果你一定要把索引放在前面,可以把得到的结果进行交换。

2
>>> assignId("lonely line")
(15, 'lonely line')

enumerate 可以为可迭代的每个项目生成一个唯一的索引,并返回一个包含 (索引, 原始项目) 的元组。

如果你想从 0 以外的数字开始编号,可以把起始值作为第二个参数传给 enumerate

>>> lsts = [
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 2],
...     [1, 1, 1, 2, 1, 2]
...     ]
...
>>> list(enumerate(lsts))
[(0, [1, 1, 1, 1, 1, 1]),
 (1, [1, 1, 1, 1, 1, 1]),
 (2, [1, 1, 1, 1, 1, 1]),
 (3, [1, 1, 1, 1, 1, 1]),
 (4, [1, 1, 1, 1, 1, 1]),
 (5, [1, 1, 1, 1, 1, 1]),
 (6, [1, 1, 1, 1, 1, 2]),
 (7, [1, 1, 1, 2, 1, 2])]

需要注意的是,list 是用来从 enumerate 中获取真实值的,因为 enumerate 是一个迭代器,而不是返回列表的函数。

替代方案:全局可用的 ID 生成器

enumerate 使用起来很简单,但如果你需要在代码的不同部分分配 ID,这就会变得困难或者不可能。在这种情况下,像 OP 中提到的全局可用生成器就是一个不错的选择。

itertools 提供了 count,可以满足我们的需求:

>>> list(enumerate(lsts, 1))
[(1, [1, 1, 1, 1, 1, 1]),
 (2, [1, 1, 1, 1, 1, 1]),
 (3, [1, 1, 1, 1, 1, 1]),
 (4, [1, 1, 1, 1, 1, 1]),
 (5, [1, 1, 1, 1, 1, 1]),
 (6, [1, 1, 1, 1, 1, 1]),
 (7, [1, 1, 1, 1, 1, 2]),
 (8, [1, 1, 1, 2, 1, 2])]

现在我们有了一个(全局可用的)idgen 生成器,可以用来生成唯一的 ID。

我们可以通过一个名为 prid(打印 ID)的函数来测试它:

>>> from itertools import count
>>> idgen = count()

它工作正常,我们可以在一组值上进行测试:

>>> def prid():
...     id = idgen.next()
...     print id
...
>>> prid()
0
>>> prid()
1
>>> prid()
2
>>> prid()
3

并定义一个实际的函数,当调用它并传入一个值时,会返回一个元组 (id, value)

>>> lst = ['100', '101', '102', '103', '104', '105', '106', '107', '108', '109']

注意,不需要将 idgen 声明为全局变量,因为我们不会改变它的值(idgen 只会在调用时改变内部状态,但仍然是同一个生成器)。

测试一下它是否有效:

>>> def assignId(val):
...     return (idgen.next(), val)
...

并在列表上尝试一下:

>>> assignId("ahahah")
(4, 'ahahah')

enumerate 的解决方案相比,主要的区别在于,我们可以在代码的任何地方逐个分配 ID,而不需要在一个处理 enumerate 的过程中完成所有操作。

>>> map(assignId, lst)
[(5, '100'),
 (6, '101'),
 (7, '102'),
 (8, '103'),
 (9, '104'),
 (10, '105'),
 (11, '106'),
 (12, '107'),
 (13, '108'),
 (14, '109')]

撰写回答