Python/Django数据库轮询内存问题

2024-04-25 20:48:53 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个Python脚本为database和memcache运行Django,但它显然是作为一个独立的守护进程运行的(即不响应webserver请求)。守护进程检查Django模型对带有status=STATUS_NEW的对象的请求,然后将它们标记为工作状态并将它们放入队列中。

许多进程(使用多进程包创建)将从队列中提取内容,并使用传递给队列的pr.id处理请求。我相信内存泄漏可能在下面的代码中(但它可能在队列另一侧的“Worker”代码中,尽管这不太可能,因为即使没有出现任何请求,也会增加内存大小,即当workers都在Queue.get()上阻塞时)。

from requisitions.models import Requisition # our Django model
from multiprocessing import Queue

while True:
    # Wait for "N"ew requisitions, then pop them into the queue.
    for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW):
        pr.set_status(pr.STATUS_WORKING)
        pr.save()
        queue.put(pr.id)

    time.sleep(settings.DAEMON_POLL_WAIT)

其中settings.DAEMON_POLL_WAIT=0.01

如果我让它运行一段时间(即几天),Python进程将增长到无限大,最终系统将耗尽内存。

这里发生了什么(或者我怎么知道),更重要的是-你怎么能运行一个守护进程来做这个?

我的第一个想法是改变函数的动态性,特别是将对新请求对象的检查放入一个django.core.cache cache,即

from django.core.cache import cache

while True:
    time.sleep(settings.DAEMON_POLL_WAIT)
    if cache.get('new_requisitions'):
       # Possible race condition
       cache.clear()
       process_new_requisitions(queue)

 def process_new_requisitions(queue):
    for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW):
        pr.set_status(pr.STATUS_WORKING)
        pr.save()
        queue.put(pr.id)

使用status=STATUS_NEW创建申请的进程可以执行cache.set('new_requisitions', 1)(或者,我们可以捕获正在创建新申请的signal或Requisition.save()事件,然后从那里在缓存中设置标志)。

但是,我不确定我在这里提出的解决方案是否解决了内存问题(这可能与垃圾收集有关,因此通过process_new_requisitions的范围界定可以解决这个问题)。

我很感激你的任何想法和反馈。


Tags: django内存fromidcachenew队列queue
3条回答

我有很多数据处理工作要做,所以,我解决这个问题的方法是使用多处理,并使用池来抵消正在发生的任何内存膨胀。

为了保持简单,我只是定义了一些“全局”(顶级,不管Python中的术语是什么)函数,而不是试图使事情变得可pickle。

这里是抽象形式:

import multiprocessing as mp

WORKERS = 16 # I had 7 cores, allocated 16 because processing was I/O bound

# this is a global function
def worker(params):
  # do stuff
  return something_for_the_callback_to_analyze

# this is a global function
def worker_callback(worker_return_value):
  # report stuff, or pass

# My multiprocess_launch was inside of a class
def multiprocess_launcher(params):
  # somehow define a collection
  while True:
    if len(collection) == 0:
      break
    # Take a slice
    pool_sub_batch = []
    for _ in range(WORKERS):
      if collection: # as long as there's still something in the collection
        pool_sub_batch.append( collection.pop() )
    # Start a pool, limited to the slice
    pool_size = WORKERS
    if len(pool_sub_batch) < WORKERS:
      pool_size = len(pool_sub_batch)
    pool = mp.Pool(processes=pool_size)
    for sub_batch in pool_sub_batch:
      pool.apply_async(worker, args = (sub_batch), callback = worker_callback)
    pool.close()
    pool.join()
    # Loop, more slices

您需要定期重置Django为调试目的保留的查询列表。通常在每次请求后都会清除它,但由于您的应用程序不是基于请求的,因此您需要手动执行此操作:

from django import db

db.reset_queries()

另见:

  • 米科的"Debugging Django memory leak with TrackRefs and Guppy" 奥塔玛:

    Django keeps track of all queries for debugging purposes (connection.queries). This list is reseted at the end of HTTP request. But in standalone mode, there are no requests. So you need to manually reset to queries list after each working cycle

  • "Why is Django leaking memory?" in Django FAQ-两者都能说 关于将始终重要的DEBUG设置为False,以及 关于使用db.reset_queries()清除查询列表, 在像你这样的应用中很重要。

守护进程的settings.py文件是否有DEBUG = True?如果是这样的话,Django会在内存中保存到目前为止运行的所有SQL的记录,这可能会导致内存泄漏。

相关问题 更多 >