Python/Django数据库轮询出现内存泄漏

16 投票
4 回答
9947 浏览
提问于 2025-04-15 19:44

我有一个用Python写的脚本,它在Django框架下运行,用来处理数据库和内存缓存。不过,这个脚本是作为一个独立的守护进程在运行,也就是说,它并不响应网页服务器的请求。这个守护进程会检查一个Django模型叫做Requisition,找出状态为status=STATUS_NEW的对象,然后把它们的状态改为STATUS_WORKING,并放入一个队列中。

有几个进程(是用multiprocess这个包创建的)会从队列中取出任务,然后处理带有pr.id的Requisition。我觉得内存泄漏可能出现在下面的代码中(不过也有可能是在队列另一边的'Worker'代码中,但这种可能性不大,因为即使没有新的Requisition进来,内存使用量还是在增加,也就是说,当所有的工作进程都在等待Queue.get()时,内存也在增长)。

from requisitions.models import Requisition # our Django model
from multiprocessing import Queue

while True:
    # Wait for "N"ew requisitions, then pop them into the queue.
    for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW):
        pr.set_status(pr.STATUS_WORKING)
        pr.save()
        queue.put(pr.id)

    time.sleep(settings.DAEMON_POLL_WAIT)

这里的settings.DAEMON_POLL_WAIT=0.01

看起来如果我让这个进程运行一段时间(比如几天),Python进程的内存使用量会无限增长,最终系统会耗尽内存。

这到底是怎么回事(或者我该怎么查找原因),更重要的是——怎么才能让这样的守护进程正常运行呢?

我首先想到的是改变这个函数的动态,特别是把检查新Requisition对象的部分放到django.core.cache cache中,也就是说:

from django.core.cache import cache

while True:
    time.sleep(settings.DAEMON_POLL_WAIT)
    if cache.get('new_requisitions'):
       # Possible race condition
       cache.clear()
       process_new_requisitions(queue)

 def process_new_requisitions(queue):
    for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW):
        pr.set_status(pr.STATUS_WORKING)
        pr.save()
        queue.put(pr.id)

创建状态为status=STATUS_NEW的Requisitions的进程可以执行cache.set('new_requisitions', 1)(或者我们可以捕捉一个信号,或者在Requisition.save()事件中,当创建新的Requisition时,从那里设置缓存中的标志)。

不过我不确定我提出的这个解决方案是否能解决内存问题(这些问题可能与垃圾回收有关,所以通过process_new_requisitions的作用域可能会解决这个问题)。

我很感激任何想法和反馈。

4 个回答

3

我需要处理很多数据,所以我选择了使用多进程来解决这个问题,并利用进程池来应对内存占用过高的情况。

为了简单起见,我只是定义了一些“全局”的(就是在Python中顶层的意思)函数,而不是去尝试让所有东西都可以被序列化。

下面是一个抽象的示例:

import multiprocessing as mp

WORKERS = 16 # I had 7 cores, allocated 16 because processing was I/O bound

# this is a global function
def worker(params):
  # do stuff
  return something_for_the_callback_to_analyze

# this is a global function
def worker_callback(worker_return_value):
  # report stuff, or pass

# My multiprocess_launch was inside of a class
def multiprocess_launcher(params):
  # somehow define a collection
  while True:
    if len(collection) == 0:
      break
    # Take a slice
    pool_sub_batch = []
    for _ in range(WORKERS):
      if collection: # as long as there's still something in the collection
        pool_sub_batch.append( collection.pop() )
    # Start a pool, limited to the slice
    pool_size = WORKERS
    if len(pool_sub_batch) < WORKERS:
      pool_size = len(pool_sub_batch)
    pool = mp.Pool(processes=pool_size)
    for sub_batch in pool_sub_batch:
      pool.apply_async(worker, args = (sub_batch), callback = worker_callback)
    pool.close()
    pool.join()
    # Loop, more slices
6

这个守护进程的settings.py文件里有没有设置DEBUG = True?如果有的话,Django会把它运行过的所有SQL记录保存在内存里,这可能会导致内存泄漏。

40

你需要定期重置Django为了调试而保存的一些查询列表。通常情况下,这个列表会在每次请求后自动清空,但因为你的应用不是基于请求的,所以你需要手动清空这个列表:

from django import db

db.reset_queries()

另外,看看这些内容:

撰写回答