管理不同内存使用的Python多进程
我用一个简单的RabbitMQ队列来把任务分发给工作进程。每个工作进程会使用一组multiprocessing
实例,这样可以同时处理多个任务,尽量利用内存和CPU。
问题是,有些任务占用的内存比其他任务多得多,所以如果工作进程同时启动多个实例,就会崩溃。但是在工作进程处理那些占用内存较大的任务时,我希望它也能处理其他占用内存较少的任务,这样可以利用剩下的CPU。
一个想法是使用多个队列或主题,但我想知道推荐的做法是什么。我能在进程崩溃之前捕捉到内存不足的错误吗?
解决这个问题的正确方法是什么?
[更新]
整个系统将由多台多核机器组成,但每台多核机器上只运行一个工作程序,这个程序会根据核心数创建相应数量的multiprocessing
实例。不同的机器之间是独立的,除了它们从同一个队列获取任务。
1 个回答
我觉得尝试捕捉和处理内存溢出(OOM)错误会非常困难,甚至可能是不可能的。你需要一个线程或者进程一直在监控内存使用情况,当它发现内存使用过高时,应该...做什么呢?杀掉正在处理任务的进程?还是尝试暂停它(如果可能的话;这可能取决于你的任务在做什么)。即使这样,暂停也不会释放任何内存。你必须释放内存,并在安全的时候重新启动任务,这意味着你需要重新排队,决定什么时候安全等等。
与其尝试检测和恢复这个问题,我建议还是尽量避免它的发生。可以创建两个队列和两个池子。一个队列/池子用来处理高内存任务,另一个用来处理低内存任务。高内存池子里只放一个进程,这样它就只能同时运行一个任务,从而节省内存。而低内存队列里可以放 multiprocessing.cpu_count() - 1
个进程,这样可以让你的CPU在两个池子之间充分利用。
这种方法的一个潜在问题是,如果高内存队列用完了,而低内存任务还在等待,你就会浪费一个CPU。你可以以非阻塞的方式处理高内存队列(或者设置一个超时),这样如果在你准备处理任务时高内存队列是空的,你就可以选择一个低内存任务来处理。然后在处理完后,再检查一下高内存队列。
大概是这样的:
import multiprocessing
# hi_q and lo_q are placeholders for whatever library you're using to consume from RabbitMQ
def high_mem_consume():
while True:
task = hi_q.consume(timeout=2)
if not task:
lo_q.consume(timeout=2)
if task:
process_task(task)
def low_mem_consume():
while True:
task = lo_q.consume() # Blocks forever
process_task(task)
if __name__ == "__main__":
hi_pool = multiprocessing.Pool(1)
lo_pool = multiprocessing.Pool(multiprocessing.cpu_count() - 1)
hi_pool.apply_async(high_mem_consume)
lo_pool.apply_async(lo_mem_consume)