Pyspark RDD list index out of range

2024-04-24 00:33:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我有以下形式的RDD:

[[['a'],['b,c,d','e,f,g']],[['h'],['i,j,k','l,m,n']]]

我想要达到的目标:

^{pr2}$

我所做的:

def pass_row(line):
  new_line = []
  key = ''.join(line[0])
  for el in line[1]:
    el = key +' ,'+ el
    new_line.append(el)
  return new_line

rdd.map(pass_row)

它适用于较小的数据样本。但是,当我试图在我的整个数据集上运行list index out of range时,我正在for el in line[1]:行上运行它。。。在

基本上,{cd3{cd3}集合中有不同的值。我的最终目标是以行的形式将其作为spark dataframe:

col1 col2 col3 col 4
a     b    c    d
a     e    f    g
h     i    j    k
h     l    m    n

谢谢你的建议!在


Tags: 数据keyin目标newfordefline
1条回答
网友
1楼 · 发布于 2024-04-24 00:33:47

您的错误似乎与您的数据有关,而不是函数(它似乎是正确的,尽管有点过于复杂),而且看起来您将它应用于没有line[1]的行。在

您能否确保line的元素数在实际数据集中是常量,例如:

def pass_row(line):
    assert len(line) == 2
    return [ "%s, %s" % (''.join(line[0]), el) for el in line[1]]

也就是说,对于您的实际目标,您可能应该停止从该点开始处理字符串,而直接将数据作为二维数组获取,例如:

^{pr2}$

警告这里,您不能用该解决方案直接向数据帧提供数据,因为每一组前缀生成的行仍然嵌套在它自己的列表中(它只是一个“2D数组列表”)。在

例如,使用sum函数,您可以在reduce步骤中轻松地进行转置:

>>> sum(b, [])
[['a', 'b', 'c', 'd'], ['a', 'e', 'f', 'g'], ['h', 'i', 'j', 'k'], ['h', 'l', 'm', 'n']]

因此,您的解决方案需要3个步骤:

  • 使用pass_row映射数据集
  • 使用sum内置函数来减少结果,该函数应用于初始累加器[]
  • 将结果输入Spark数据帧

在纯Python中,下面的一个代码行可以完成这个任务

>>> fn = lambda ls : sum([ [ i[0] + el.split(',') for el in i[1]] for i in ls ], [])
>>> fn([[['a'],['b,c,d','e,f,g']],[['h'],['i,j,k','l,m,n']]])
[['a', 'b', 'c', 'd'], ['a', 'e', 'f', 'g'], ['h', 'i', 'j', 'k'], ['h', 'l', 'm', 'n']]

相关问题 更多 >