Python 多进程内存增加问题
我有一个程序,它应该一直运行下去。以下是我正在做的事情:
from myfuncs import do, process
class Worker(multiprocessing.Process):
def __init__(self, lock):
multiprocesing.Process.__init__(self)
self.lock = lock
self.queue = Redis(..) # this is a redis based queue
self.res_queue = Redis(...)
def run():
while True:
job = self.queue.get(block=True)
job.results = process(job)
with self.lock:
post_process(self.res_queue, job)
def main():
lock = multiprocessing.Semaphore(1)
ps = [Worker(lock) for _ in xrange(4)]
[p.start() for p in ps]
[p.join() for p in ps]
self.queue 和 self.res_queue 是两个对象,它们的工作方式类似于 Python 标准库中的队列,但它们使用 Redis 数据库作为后端。
process 函数对工作携带的数据进行一些处理(主要是解析 HTML),然后返回一个字典。
post_process 函数通过检查一些条件将工作写入另一个 Redis 队列(一次只能有一个进程检查这些条件,所以需要加锁)。它返回 True 或 False。
这个程序每天使用的内存在不断增加。有人能搞清楚这是怎么回事吗?
当工作在 run 方法中超出作用域时,内存应该是可以释放的,对吧?
2 个回答
如果你找不到内存泄漏的原因,可以通过限制每个工作进程处理的任务数量来解决这个问题。也就是说,当一个工作进程完成了它能处理的最大任务后,你可以让它退出,然后用一个新的工作进程替换它。Python自带的 multiprocessing.Pool
对象就支持这种做法,你可以使用 maxtasksperchild
这个参数来设置。你可以这样做:
import multiprocessing
import threading
class WorkerPool(object):
def __init__(self, workers=multiprocessing.cpu_count(),
maxtasksperchild=None, lock=multiprocessing.Semaphore(1)):
self._lock = multiprocessing.Semaphore(1)
self._max_tasks = maxtasksperchild
self._workers = workers
self._pool = []
self._repopulate_pool()
self._pool_monitor = threading.Thread(self._monitor_pool)
self._pool_monitor.daemon = True
self._pool_monitor.start()
def _monitor_pool(self):
""" This runs in its own thread and monitors the pool. """
while True:
self._maintain_pool()
time.sleep(0.1)
def _maintain_pool(self):
""" If any workers have exited, start a new one in its place. """
if self._join_exited_workers():
self._repopulate_pool()
def _join_exited_workers(self):
""" Find exited workers and join them. """
cleaned = False
for i in reversed(range(len(self._pool))):
worker = self._pool[i]
if worker.exitcode is not None:
# worker exited
worker.join()
cleaned = True
del self._pool[i]
return cleaned
def _repopulate_pool(self):
""" Start new workers if any have exited. """
for i in range(self._workers - len(self._pool)):
w = Worker(self._lock, self._max_tasks)
self._pool.append(w)
w.start()
class Worker(multiprocessing.Process):
def __init__(self, lock, max_tasks):
multiprocesing.Process.__init__(self)
self.lock = lock
self.queue = Redis(..) # this is a redis based queue
self.res_queue = Redis(...)
self.max_tasks = max_tasks
def run():
runs = 0
while self.max_tasks and runs < self.max_tasks:
job = self.queue.get(block=True)
job.results = process(job)
with self.lock:
post_process(self.res_queue, job)
if self.max_tasks:
runs += 1
def main():
pool = WorkerPool(workers=4, maxtasksperchild=1000)
# The program will block here since none of the workers are daemons.
# It's not clear how/when you want to shut things down, but the Pool
# can be enhanced to support that pretty easily.
需要注意的是,上面的池监控代码几乎和 multiprocessing.Pool
中用于同样目的的代码一模一样。
当任务在运行方法中超出范围时,内存应该被释放,对吗?
首先,范围是整个 run
方法,而这个方法是无限循环的,所以这根本不会发生。(而且,当你退出 run
方法时,进程会关闭,内存也会被释放……)
即使真的超出了范围,也不意味着你想的那样。Python 和 C++ 不一样,C++ 有些变量的存储是在栈上的,而 Python 中所有对象都在堆上,它们会一直存在,直到没有任何引用指向它们。变量超出范围意味着这个变量不再指向它之前指向的对象。如果这个变量是唯一指向该对象的引用,那么它会被释放*,但如果你在其他地方还有其他引用,那么这个对象就不能被释放,直到那些其他引用消失。
同时,超出范围并没有什么神奇的地方。任何让变量停止指向一个对象的方式都有相同的效果——无论是变量超出范围、你调用 del
,还是你给它赋一个新值。因此,每次循环时,当你执行 job =
时,你实际上是在放弃之前对 job
的引用,即使没有任何东西超出范围。(但要记住,在峰值时你会有 两个 job 存在,而不是一个,因为新的 job 是在旧的被释放之前从队列中取出的。如果这是个问题,你可以在阻塞队列之前先执行 job = None
。)
所以,假设问题确实出在 job
对象(或它拥有的某些东西)上,问题在于你没有展示的某些代码在某个地方保留了对它的引用。
在不知道你具体在做什么的情况下,很难建议解决方案。可能只是“不要把它存储在那里”。或者“存储一个弱引用而不是对象本身”。或者“添加一个 LRU 算法”。或者“添加一些流控制,以便如果你积压太多工作,就不会继续增加工作直到内存耗尽”。
* 在 CPython 中,这会立即发生,因为垃圾回收是基于引用计数的。而在 Jython 和 IronPython 中,垃圾回收则依赖于底层虚拟机的垃圾回收器,因此对象不会被释放,直到 JVM 或 CLR 注意到它不再被引用,这通常不是立即的,并且是不可预测的。