循环中池的内存使用过高
我有一个包含两个池的循环:
if __name__ == '__main__':
for length in range(1, 15, 5):
def map_CCWP(it):
return CCWP(G, length, Ep)
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
Scores = pool.map(map_CCWP, range(R))
S = []
# some work to get S
def map_AvgIAC (it):
return avgIAC(G, S, Ep, I)
pool2 = multiprocessing.Pool(processes=multiprocessing.cpu_count())
T = pool2.map(map_AvgIAC, range(4))
但是在运行这个循环时,内存使用量越来越多,可能是因为每次都创建新的池工作者。我尝试在每次循环结束时删除池,但内存使用量还是在增加。
另一个选择是把池放在一个条件下:
if pool == None:
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
这样确实不会消耗那么多内存。不过,每次循环中,函数 map_CCWP
和 map_AvgIAC
的参数都会变化,而我发现 pool.map
会使用 map_CCWP
的初始 length
和 map_AvgIAC
的初始 S
。
我该如何在每次循环中使用不同的函数,并且不增加内存使用呢?
2 个回答
看起来你在程序运行的时候创建了越来越多的池子。那如果你在循环之前先初始化一个固定大小的池子呢?然后在循环里只往这个池子里添加任务。这样一来,从概念上讲,你的并行处理就被池子的大小限制了,所以内存的使用量应该会得到控制。
默认情况下,工作池中的工人(worker)是在开始时创建的,并且会一直活到结束。因为你没有初始化它们,所以在你的情况下,让它们一直保持活着并没有太大的性能优势。
所以:
pool = multiprocessing.Pool(processes=None, maxtasksperchild=1)
这段代码会创建一个工人,执行任务,然后把它杀掉,再创建一个新的工人。任何占用的内存或资源都会被释放。如果你的内存增长不是太大,可以增加每个工人处理的任务数量。
注意,我把进程的数量定义为 None
。这和使用 multiprocessing.cpu_count()
是一样的,但写起来更简洁。
在其他情况下,我发现有时候(在几百万次中一两次),某个随机的工人会疯狂地占用内存,机器开始交换内存,结果一切都卡住或者变得非常慢。我的解决方法是:
iterations = int(math.ceil(total / b_size))
for block in xrange(iterations):
restricted_iterator = iterator[block * b_size:(block + 1) * b_size]
# This works because a slice can end beyond the length of the list.
pool = multiprocessing.Pool(processes=None, maxtasksperchild=1)
try:
peaks = pool.map(caller, restricted_iterator)
except Exception as e:
raise e # I don't expect this to ever happen.
finally:
pool.terminate()
# Kill the pool very dead.
# Save the data to disk and free memory.
我把工作分成小块,逐个处理。这样,如果有哪个“疯狂的工人”在占用内存,其他的工人可以在几分钟内完成任务,那个占用内存的工人就会单独待着,能使用更多的内存。因此,它会在不到几分钟内完成,这样程序的总延迟就不会太大。通过调整 b_size
,我可以控制清理的频率。(在我的情况下,把工作分成10到20块,中间保存到磁盘,我的平均CPU使用率仍然保持在97%左右,所以损失不大。)