我有以下形式的RDD:
[[['a'],['b,c,d','e,f,g']],[['h'],['i,j,k','l,m,n']]]
我想要达到的目标:
^{pr2}$我所做的:
def pass_row(line):
new_line = []
key = ''.join(line[0])
for el in line[1]:
el = key +' ,'+ el
new_line.append(el)
return new_line
rdd.map(pass_row)
它适用于较小的数据样本。但是,当我试图在我的整个数据集上运行list index out of range
时,我正在for el in line[1]:
行上运行它。。。在
基本上,{cd3{cd3}集合中有不同的值。我的最终目标是以行的形式将其作为spark dataframe:
col1 col2 col3 col 4
a b c d
a e f g
h i j k
h l m n
谢谢你的建议!在
您的错误似乎与您的数据有关,而不是函数(它似乎是正确的,尽管有点过于复杂),而且看起来您将它应用于没有
line[1]
的行。在您能否确保
line
的元素数在实际数据集中是常量,例如:也就是说,对于您的实际目标,您可能应该停止从该点开始处理字符串,而直接将数据作为二维数组获取,例如:
^{pr2}$警告这里,您不能用该解决方案直接向数据帧提供数据,因为每一组前缀生成的行仍然嵌套在它自己的列表中(它只是一个“2D数组列表”)。在
例如,使用sum函数,您可以在reduce步骤中轻松地进行转置:
因此,您的解决方案需要3个步骤:
pass_row
映射数据集sum
内置函数来减少结果,该函数应用于初始累加器[]
在纯Python中,下面的一个代码行可以完成这个任务
相关问题 更多 >
编程相关推荐