Python/Django数据库轮询出现内存泄漏
我有一个用Python写的脚本,它在Django框架下运行,用来处理数据库和内存缓存。不过,这个脚本是作为一个独立的守护进程在运行,也就是说,它并不响应网页服务器的请求。这个守护进程会检查一个Django模型叫做Requisition,找出状态为status=STATUS_NEW
的对象,然后把它们的状态改为STATUS_WORKING,并放入一个队列中。
有几个进程(是用multiprocess这个包创建的)会从队列中取出任务,然后处理带有pr.id
的Requisition。我觉得内存泄漏可能出现在下面的代码中(不过也有可能是在队列另一边的'Worker'代码中,但这种可能性不大,因为即使没有新的Requisition进来,内存使用量还是在增加,也就是说,当所有的工作进程都在等待Queue.get()时,内存也在增长)。
from requisitions.models import Requisition # our Django model
from multiprocessing import Queue
while True:
# Wait for "N"ew requisitions, then pop them into the queue.
for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW):
pr.set_status(pr.STATUS_WORKING)
pr.save()
queue.put(pr.id)
time.sleep(settings.DAEMON_POLL_WAIT)
这里的settings.DAEMON_POLL_WAIT=0.01
。
看起来如果我让这个进程运行一段时间(比如几天),Python进程的内存使用量会无限增长,最终系统会耗尽内存。
这到底是怎么回事(或者我该怎么查找原因),更重要的是——怎么才能让这样的守护进程正常运行呢?
我首先想到的是改变这个函数的动态,特别是把检查新Requisition对象的部分放到django.core.cache cache
中,也就是说:
from django.core.cache import cache
while True:
time.sleep(settings.DAEMON_POLL_WAIT)
if cache.get('new_requisitions'):
# Possible race condition
cache.clear()
process_new_requisitions(queue)
def process_new_requisitions(queue):
for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW):
pr.set_status(pr.STATUS_WORKING)
pr.save()
queue.put(pr.id)
创建状态为status=STATUS_NEW
的Requisitions的进程可以执行cache.set('new_requisitions', 1)
(或者我们可以捕捉一个信号,或者在Requisition.save()事件中,当创建新的Requisition时,从那里设置缓存中的标志)。
不过我不确定我提出的这个解决方案是否能解决内存问题(这些问题可能与垃圾回收有关,所以通过process_new_requisitions
的作用域可能会解决这个问题)。
我很感激任何想法和反馈。
4 个回答
我需要处理很多数据,所以我选择了使用多进程来解决这个问题,并利用进程池来应对内存占用过高的情况。
为了简单起见,我只是定义了一些“全局”的(就是在Python中顶层的意思)函数,而不是去尝试让所有东西都可以被序列化。
下面是一个抽象的示例:
import multiprocessing as mp
WORKERS = 16 # I had 7 cores, allocated 16 because processing was I/O bound
# this is a global function
def worker(params):
# do stuff
return something_for_the_callback_to_analyze
# this is a global function
def worker_callback(worker_return_value):
# report stuff, or pass
# My multiprocess_launch was inside of a class
def multiprocess_launcher(params):
# somehow define a collection
while True:
if len(collection) == 0:
break
# Take a slice
pool_sub_batch = []
for _ in range(WORKERS):
if collection: # as long as there's still something in the collection
pool_sub_batch.append( collection.pop() )
# Start a pool, limited to the slice
pool_size = WORKERS
if len(pool_sub_batch) < WORKERS:
pool_size = len(pool_sub_batch)
pool = mp.Pool(processes=pool_size)
for sub_batch in pool_sub_batch:
pool.apply_async(worker, args = (sub_batch), callback = worker_callback)
pool.close()
pool.join()
# Loop, more slices
这个守护进程的settings.py文件里有没有设置DEBUG = True
?如果有的话,Django会把它运行过的所有SQL记录保存在内存里,这可能会导致内存泄漏。
你需要定期重置Django为了调试而保存的一些查询列表。通常情况下,这个列表会在每次请求后自动清空,但因为你的应用不是基于请求的,所以你需要手动清空这个列表:
from django import db
db.reset_queries()
另外,看看这些内容:
"使用TrackRefs和Guppy调试Django内存泄漏",作者是Mikko Ohtamaa:
Django会记录所有的查询,以便于调试(connection.queries)。这个列表会在每次HTTP请求结束时被重置。但在独立模式下,没有请求,所以你需要在每个工作周期结束后手动重置查询列表。
"为什么Django会泄漏内存?" 在Django常见问题解答中 - 这里提到了将
DEBUG
设置为False
的重要性,以及如何使用db.reset_queries()
来清空查询列表,这在像你这样的应用中非常重要。