Python无法通过multiprocessing.pool分配内存
我的代码(这是一个遗传优化算法的一部分)会同时运行几个进程,等它们都完成后,再读取输出,然后用不同的输入重复这个过程。测试的时候,我用60次重复运行,一切都很顺利。既然没问题,我就决定用一个更实际的数字,200次重复。结果我收到了这个错误:
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 302, in _handle_workers
pool._maintain_pool()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 206, in _maintain_pool
self._repopulate_pool()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 199, in _repopulate_pool
w.start()
File "/usr/lib/python2.7/multiprocessing/process.py", line 130, in start
self._popen = Popen(self)
File "/usr/lib/python2.7/multiprocessing/forking.py", line 120, in __init__
self.pid = os.fork()
OSError: [Errno 12] Cannot allocate memory
下面是我使用池的代码片段:
def RunMany(inputs):
from multiprocessing import cpu_count, Pool
proc=inputs[0]
pool=Pool(processes = proc)
results=[]
for arg1 in inputs[1]:
for arg2 in inputs[2]:
for arg3 in inputs[3]:
results.append(pool.apply_async(RunOne, args=(arg1, arg2, arg3)))
casenum=0
datadict=dict()
for p in results:
#get results of simulation once it has finished
datadict[casenum]=p.get()
casenum+=1
return datadict
RunOne函数会创建我自己定义的一个类的对象,使用一个计算量很大的Python包来解决一个化学问题,这个过程大约需要30秒,最后返回这个化学求解器的输出对象。
所以,我的代码是先顺序调用RunMany,然后RunMany再并行调用RunOne。在我的测试中,我用10个处理器(我的电脑有16个)来调用RunOne,并且用20个调用的池来处理。换句话说,len(arg1)*len(arg2)*len(arg3)=20。当我的代码调用RunMany 60次时,一切都很正常,但当我调用200次时就内存不够用了。
这是不是意味着某个进程没有正确地清理自己?我是不是有内存泄漏?我该如何判断自己是否有内存泄漏,怎么找到泄漏的原因?在我200次重复的循环中,唯一增长的东西是一个数字列表,它的大小从0增长到200。我有一个自定义类的对象字典,但它的长度限制在50个条目——每次循环执行时,它会从字典中删除一个项目,并用另一个项目替换。
编辑:这是调用RunMany的代码片段
for run in range(nruns):
#create inputs object for RunMany using genetic methods.
#Either use starting "population" or create "child" inputs from successful previous runs
datadict = RunMany(inputs)
sumsquare=0
for i in range(len(datadictsenk)): #input condition
sumsquare+=Compare(datadict[i],Target[i]) #compare result to target
with open(os.path.join(mainpath,'Outputs','output.txt'),'a') as f:
f.write('\t'.join([str(x) for x in [inputs.name, sumsquare]])+'\n')
Objective.append(sumsquare) #add sum of squares to list, to be plotted outside of loop
population[inputs]=sumsquare #add/update the model in the "population", using the inputs object as a key, and it's objective function as the value
if len(population)>initialpopulation:
population = PopulationReduction(population) #reduce the "population" by "killing" unfit "genes"
avgtime=(datetime.datetime.now()-starttime2)//(run+1)
remaining=(nruns-run-1)*avgtime
print(' Finished '+str(run+1)+' / ' +str(nruns)+'. Elapsed: '+str(datetime.datetime.now().replace(microsecond=0)-starttime)+' Remaining: '+str(remaining)+' Finish at '+str((datetime.datetime.now()+remaining).replace(microsecond=0))+'~~~', end="\r")
1 个回答
17
在我提问的评论中,有人提到这个答案是来自Puciek。
解决办法是,在进程池完成后要手动关闭它。我原以为这个池会自动关闭,因为results
变量只在RunMany
这个函数内部使用,函数结束后这个变量就会被删除。但实际上,Python并不总是像我想的那样工作。
修正后的代码是:
def RunMany(inputs):
from multiprocessing import cpu_count, Pool
proc=inputs[0]
pool=Pool(processes = proc)
results=[]
for arg1 in inputs[1]:
for arg2 in inputs[2]:
for arg3 in inputs[3]:
results.append(pool.apply_async(RunOne, args=(arg1, arg2, arg3)))
#new section
pool.close()
pool.join()
#end new section
casenum=0
datadict=dict()
for p in results:
#get results of simulation once it has finished
datadict[casenum]=p.get()
casenum+=1
return datadict