PySpark 删除行
我该如何在PySpark中从RDD中删除行呢?特别是第一行,因为我的数据集中第一行通常是列名。我查看了API,但似乎找不到简单的方法来做到这一点。当然,我可以通过Bash或HDFS来处理,但我只是想知道是否可以在PySpark内部完成这个操作。
6 个回答
我个人觉得,直接用过滤器来去掉这些东西是最简单的方法。不过根据你的评论,我还有另一种方法。可以把 RDD 处理成每个分区都是一个数组(我假设每个分区有一个文件,并且每个文件的第一行是需要去掉的那一行),然后就可以跳过第一个元素(这是用 Scala 的 API 来做的)。
data.glom().map(x => for (elem <- x.drop(1){/*做一些事情*/}) //x 是一个数组,所以直接跳过第 0 个索引
要记住,RDD 的一个重要特点是它们是不可变的,所以自然地删除一行数据是比较棘手的事情。
更新:
更好的解决方案。
rdd.mapPartions(x => for (elem <- x.drop(1){/*做一些事情*/})
这个方法和 glom 类似,但没有把所有东西放进数组的开销,因为在这种情况下,x 是一个迭代器。
我做了一些性能测试,得到了以下结果。
集群配置
集群
- 集群 1 : 4 个核心,16 GB 内存
- 集群 2 : 4 个核心,16 GB 内存
- 集群 3 : 4 个核心,16 GB 内存
- 集群 4 : 2 个核心,8 GB 内存
数据
有 700 万行数据,4 列
#Solution 1
# Time Taken : 40 ms
data=sc.TextFile('file1.txt')
firstRow=data.first()
data=data.filter(lambda row:row != firstRow)
#Solution 2
# Time Taken : 3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
return iter(list(iterator)[1:]) if index==0 else iterator
data=data.mapPartitionsWithIndex(dropFirstRow)
#Solution 3
# Time Taken : 0.3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
if(index==0):
for subIndex,item in enumerate(iterator):
if subIndex > 0:
yield item
else:
yield iterator
data=data.mapPartitionsWithIndex(dropFirstRow)
我认为解决方案 3 是最具扩展性的。
在PySpark(Python的一个接口)中,有一个简单的方法可以实现这个,前提是你使用的是Python 3:
noHeaderRDD = rawRDD.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys()
关于PySpark的具体内容:
根据@maasg的说法,你可以这样做:
header = rdd.first()
rdd.filter(lambda line: line != header)
不过,这样做并不完全正确,因为你可能会把包含数据的行和表头都排除掉。不过,这个方法对我来说是有效的:
def remove_header(itr_index, itr):
return iter(list(itr)[1:]) if itr_index == 0 else itr
rdd.mapPartitionsWithIndex(remove_header)
类似的:
rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])
我刚接触Spark,所以不能聪明地评论哪个方法会更快。
据我所知,没什么“简单”的方法可以做到这一点。
不过,这段代码应该能解决问题:
val header = data.first
val rows = data.filter(line => line != header)