PySpark 删除行

28 投票
6 回答
53667 浏览
提问于 2025-04-18 13:06

我该如何在PySpark中从RDD中删除行呢?特别是第一行,因为我的数据集中第一行通常是列名。我查看了API,但似乎找不到简单的方法来做到这一点。当然,我可以通过Bash或HDFS来处理,但我只是想知道是否可以在PySpark内部完成这个操作。

6 个回答

1

我个人觉得,直接用过滤器来去掉这些东西是最简单的方法。不过根据你的评论,我还有另一种方法。可以把 RDD 处理成每个分区都是一个数组(我假设每个分区有一个文件,并且每个文件的第一行是需要去掉的那一行),然后就可以跳过第一个元素(这是用 Scala 的 API 来做的)。

data.glom().map(x => for (elem <- x.drop(1){/*做一些事情*/}) //x 是一个数组,所以直接跳过第 0 个索引

要记住,RDD 的一个重要特点是它们是不可变的,所以自然地删除一行数据是比较棘手的事情。

更新: 更好的解决方案。
rdd.mapPartions(x => for (elem <- x.drop(1){/*做一些事情*/})
这个方法和 glom 类似,但没有把所有东西放进数组的开销,因为在这种情况下,x 是一个迭代器。

3

我做了一些性能测试,得到了以下结果。

集群配置

集群

  • 集群 1 : 4 个核心,16 GB 内存
  • 集群 2 : 4 个核心,16 GB 内存
  • 集群 3 : 4 个核心,16 GB 内存
  • 集群 4 : 2 个核心,8 GB 内存

数据

有 700 万行数据,4 列

#Solution 1
# Time Taken : 40 ms
data=sc.TextFile('file1.txt')
firstRow=data.first()
data=data.filter(lambda row:row != firstRow)

#Solution 2
# Time Taken : 3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
     return iter(list(iterator)[1:]) if index==0 else iterator
data=data.mapPartitionsWithIndex(dropFirstRow)

#Solution 3
# Time Taken : 0.3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
     if(index==0):
          for subIndex,item in enumerate(iterator):
               if subIndex > 0:
                    yield item
     else:
          yield iterator

data=data.mapPartitionsWithIndex(dropFirstRow)

我认为解决方案 3 是最具扩展性的。

5

在PySpark(Python的一个接口)中,有一个简单的方法可以实现这个,前提是你使用的是Python 3:

noHeaderRDD = rawRDD.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys()
23

关于PySpark的具体内容:

根据@maasg的说法,你可以这样做:

header = rdd.first()
rdd.filter(lambda line: line != header)

不过,这样做并不完全正确,因为你可能会把包含数据的行和表头都排除掉。不过,这个方法对我来说是有效的:

def remove_header(itr_index, itr):
    return iter(list(itr)[1:]) if itr_index == 0 else itr
rdd.mapPartitionsWithIndex(remove_header)

类似的:

rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])

我刚接触Spark,所以不能聪明地评论哪个方法会更快。

20

据我所知,没什么“简单”的方法可以做到这一点。

不过,这段代码应该能解决问题:

val header = data.first
val rows = data.filter(line => line != header)

撰写回答