使用多进程运行子进程时出现系统错误
我在使用Multiprocessing这个包进行一些简单的基于numpy的矩阵运算时,遇到了系统错误(如下所示)。我的代码在处理小矩阵时运行得很好,但在处理大矩阵时就崩溃了(尽管有很多可用的内存)。
我使用的矩阵大小相当大(我的代码在处理1000000x10的浮点密集矩阵时运行正常,但在处理1000000x500的矩阵时就崩溃了——顺便说一下,我是通过子进程来传递这些矩阵的)。10和500是运行时的参数,其他的都保持不变(输入数据、其他运行时参数等)。
我还尝试用python3运行相同的(移植过的)代码——在处理大矩阵时,子进程进入了休眠/空闲状态(而不是像在python 2.7中那样崩溃),程序/子进程就这样卡住了,什么也不做。对于小矩阵,代码在python3中运行得很好。
如果有任何建议,我将非常感激(我快没有主意了)。
错误信息:
Exception in thread Thread-5: Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
self.run() File "/usr/lib/python2.7/threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
put(task) SystemError: NULL result without error in PyObject_Call
我使用的Multiprocessing代码:
def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses):
if len(listOfInputs) == 0:
return
# Add result queue to the list of argument tuples.
resultQueue = mp.Manager().Queue()
listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs]
# Create and initialize the pool of workers.
pool = mp.Pool(processes = nParallelProcesses)
pool.map(proc, listOfInputsNew)
# Run the processes.
pool.close()
pool.join()
# Return the results.
return [resultQueue.get() for i in range(len(listOfInputs))]
下面是为每个子进程执行的“proc”。基本上,它使用numpy解决许多线性方程组(在子进程内部构造所需的矩阵),并将结果作为另一个矩阵返回。再说一次,对于某个运行时参数的小值,它运行得很好,但对于大值就崩溃(或在python3中挂起)。
def solveForLFV(param):
startTime = time.time()
(chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param
LFoutChunkSize = XY.shape[0]
nLFdim = LFVin.shape[1]
sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim))
LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim))
for LFVoutIndex in xrange(LFoutChunkSize):
LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex]
sumLFVinOuterProductLFVpurch[:, :] = 0.
LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize)
for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)):
LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :]
sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :])
LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :])
queue.put((chunkI, LFVoutChunk))
print 'solveForLFV: ', time.time() - startTime, 'sec'
sys.stdout.flush()
1 个回答
6
500,000,000这个数字挺大的:如果你使用的是float64类型,那就需要4亿字节,差不多是4GB的内存。(而10,000,000个浮点数的数组只需要800万字节,大约80MB,明显小多了。)我猜问题可能和多进程有关,因为它在尝试把数组“打包”后通过管道发送给子进程。
既然你是在unix平台上,你可以利用fork()
的内存继承特性来避免这个问题(这个函数是用来创建多进程工作者的)。我用这个方法取得了很好的效果(这个方法来自于这个项目),具体的细节可以在评论中找到。
### A helper for letting the forked processes use data without pickling.
_data_name_cands = (
'_data_' + ''.join(random.sample(string.ascii_lowercase, 10))
for _ in itertools.count())
class ForkedData(object):
'''
Class used to pass data to child processes in multiprocessing without
really pickling/unpickling it. Only works on POSIX.
Intended use:
- The master process makes the data somehow, and does e.g.
data = ForkedData(the_value)
- The master makes sure to keep a reference to the ForkedData object
until the children are all done with it, since the global reference
is deleted to avoid memory leaks when the ForkedData object dies.
- Master process constructs a multiprocessing.Pool *after*
the ForkedData construction, so that the forked processes
inherit the new global.
- Master calls e.g. pool.map with data as an argument.
- Child gets the real value through data.value, and uses it read-only.
'''
# TODO: does data really need to be used read-only? don't think so...
# TODO: more flexible garbage collection options
def __init__(self, val):
g = globals()
self.name = next(n for n in _data_name_cands if n not in g)
g[self.name] = val
self.master_pid = os.getpid()
@property
def value(self):
return globals()[self.name]
def __del__(self):
if os.getpid() == self.master_pid:
del globals()[self.name]