如何删除PySpark中的RDD以释放资源?

2024-06-17 11:18:13 发布

您现在位置:Python中文网/ 问答频道 /正文

如果我有一个不再需要的RDD,如何从内存中删除它? 以下内容是否足以完成此任务:

del thisRDD

谢谢!


Tags: 内存rdddelthisrdd
3条回答

简短回答:下面的代码应该可以做到这一点:

import gc
del thisRDD
gc.collect()

说明:

即使您使用的是PySpark,RDD的数据也是在Java端管理的,所以首先让我们问同样的问题,但是对于Java而不是Python:

如果我使用Java,并且我只是释放对RDD的所有引用,那么这是否足以自动取消对RDD的持久化?

对于Java,答案是肯定的,根据this answer,当RDD被垃圾收集时,它将自动取消版本。(显然,该功能是在this PR中添加到Spark的。)

好吧,在Python中会发生什么?如果在Python中删除对RDD的所有引用,是否会导致在Java端删除它们?

PySpark使用Py4J将对象从Python发送到Java,反之亦然。根据Py4J Memory Model Docs

Once the object is garbage collected on the Python VM (reference count == 0), the reference is removed on the Java VM

但是请注意:删除对RDD的Python引用不会导致它立即被删除。您必须等待Python垃圾收集器清理引用。您可以阅读Py4J解释以了解详细信息,其中他们建议您执行以下操作:

A call to gc.collect() also usually works.

好,现在回到你原来的问题:

Would the following be enough to get this done:

del thisRDD

差不多。您应该删除对它的最后一个引用(即del thisRDD),然后,如果您真的需要立即取消RDD的版本**,请调用gc.collect()

**好吧,从技术上讲,这将立即删除Java端的引用,但在Java的垃圾收集器实际执行RDD的终结器并因此取消数据的持久化之前,会有一点延迟。

不,del thisRDD是不够的,它只会删除指向RDD的指针。您应该调用thisRDD.unpersist()来删除缓存的数据。

对于您的信息,Spark使用了一个懒惰计算模型,这意味着当您运行此代码时:

>>> thisRDD = sc.parallelize(xrange(10),2).cache()

您不会真正缓存任何数据,它只会标记为“以便在RDD执行计划中缓存”。你可以这样检查:

>>> print thisRDD.toDebugString()
(2) PythonRDD[6] at RDD at PythonRDD.scala:43 [Memory Serialized 1x Replicated]
 |  ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:364 [Memory Serialized 1x Replicated]

但当您至少调用一次此RDD上的操作时,它将被缓存:

>>> thisRDD.count()
10
>>> print thisRDD.toDebugString()
(2) PythonRDD[6] at RDD at PythonRDD.scala:43 [Memory Serialized 1x Replicated]
 |       CachedPartitions: 2; MemorySize: 174.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
 |  ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:364 [Memory Serialized 1x Replicated]

您可以使用地址http://<driver_node>:4040/storage轻松检查Spark UI中的持久化数据和持久化级别。您将看到del thisRDD不会改变这个RDD的持久性,但是thisRDD.unpersist()会取消它的持久性,而您仍然可以在代码中使用这个RDD(虽然它不再在内存中持久,并且每次查询时都会重新计算)

简而言之:这要看情况而定。

根据pyspark v.1.3.0 source codedel thisRDD应该足以满足PipelinedRDD,这是Python mapper/reducer生成的RDD:

class PipelinedRDD(RDD):
    # ...
    def __del__(self):
        if self._broadcast:
            self._broadcast.unpersist()
            self._broadcast = None

另一方面,RDD类没有__del__方法(虽然它可能应该有),所以您应该自己调用unpersist方法。

编辑:__del__方法在this提交中被删除。

相关问题 更多 >