Python并行执行 - 如何高效调试?
下面是一个关于Python的问题,展示了如何使用multiprocessing.Pool
来并行执行一个函数func
。这里有Np
个元素需要处理。这个func
函数只是返回Np
减去当前处理的索引。可以看到,我使用了一个队列来在并行模式下返回函数的值。
如果我把runParallel=False
,程序就可以在串行模式下运行。
程序在runParallel=False
和runParallel=True
时都能正常运行,但现在我遇到了一个关键问题:如你所见,如果把problemIndex
设置得比Np
小一点(比如problemIndex=7
),我就会出现浮点异常,也就是除以零——真是笨!:-)
当runParallel=False
时,我可以看到出错的代码行号,并且能直接找到错误。
$ python map.py
Traceback (most recent call last):
File "map.py", line 63, in <module>
a = func(argList[p])
File "map.py", line 22, in func
ret = 1/(args["index"]-args["problemIndex"])
ZeroDivisionError: integer division or modulo by zero
太好了!
但是当runParallel=True
时,我只会看到“真糟糕”的提示,却没有任何关于错误源的信息。这真让人烦!
我的问题是:在runParallel=True
的情况下,我该如何有效地调试,并从Pool()
中获取出错代码行的行号呢?
#!/usr/bin/python
# map.py
import time
import multiprocessing
import sys
import random
# Toggle whether we run parallel or not
runParallel = True
# Problematic index - if less than Np we create an exception
problemIndex = 13
# Number of compute problems
Np = 10
def func(args):
# Emulate that the function might be fast or slow
time.sleep(random.randint(1,4))
ret = args["Np"] - args["index"]
# Emulate a bug
if args["index"]==args["problemIndex"]:
ret = 1/(args["index"]-args["problemIndex"])
# Return data
if args["runParallel"]:
# We use a queue thus ordering may not be protected
args["q"].put((args["index"],ret))
else:
return ret
# Return queue used when running parallel
manager = multiprocessing.Manager()
q = manager.Queue()
# Build argument lists
argList = []
for i in range(Np):
args={}
args["index"] = i # index
args["Np"] = Np # Number of problems
args["q"] = q # return queue for parallel execution mode
args["problemIndex"] = problemIndex # if index == problemIndex then func will malfunction
args["runParallel"] = runParallel # should we run parallel
argList.append(args)
#should we run parallel
if runParallel:
# Run 10 processes in parallel
p = multiprocessing.Pool(processes=10)
ret = p.map_async(func, argList)
ret.wait()
qLen = q.qsize()
p.close()
if not qLen == Np:
print "Bummer - one of more worker threads broke down",Np,qLen
sys.exit(0)
resultVector = [None]*Np
for p in range(Np):
if runParallel:
(i,a) = q.get(timeout=0.1)
else:
i = p
a = func(argList[p])
resultVector[i] = a
for i in range(Np):
print "Index", i, "gives",resultVector[i]
3 个回答
约翰·格林诺尔提供了最好的解决方案,并且已经获得了奖励。
原因是他的解决方案没有在代码的核心部分使用try/except,也就是整个“func”部分,正如radu.ciorba所展示的那样。不过,另一种方法也是可以实现的。
由于约翰的解决方案并没有完全符合我的问题,我会发布一个我自己代码的解决方案,其中应用了约翰的思路。再次感谢约翰,也感谢拉杜!
#!/usr/bin/python
# map.py solution
import time
import multiprocessing
import sys
import random
import logging
import traceback
# Toggle whether we run parallel or not
runParallel = True
# Problematic index - if less than Np we create an exception
problemIndex = 14
# Number of compute problems
Np = 10
def func(args):
# Emulate that the function might be fast or slow
time.sleep(random.randint(1,4))
ret = args["Np"] - args["index"]
# Emulate a bug
if args["index"]==args["problemIndex"]:
ret = 1/(args["index"]-args["problemIndex"])
# Return data
return (args["index"],ret)
def mpFunctionReportError(args):
rslt=None
q = args["q"]
rslt = {"index":args["index"],
"args":None,
"error":None,
"traceback":None}
try:
rslt["result"] = func(args)
q.put(rslt)
except Exception as e:
rslt["result"] = None
rslt["error"] = e
rslt["args"] = str(args)
rslt["traceback"] = traceback.format_exc(e)
q.put(rslt)
# Return queue used when running parallel
manager = multiprocessing.Manager()
q = manager.Queue()
# Build argument lists
argList = []
for i in range(Np):
args={}
args["index"] = i # index
args["Np"] = Np # Number of problems
args["q"] = q # return queue for parallel execution mode
args["problemIndex"] = problemIndex # if index == problemIndex then func will malfunction
args["runParallel"] = runParallel # should we run parallel
argList.append(args)
resultVector = [None]*Np
#should we run parallel
if runParallel:
# Run 10 processes in parallel
p = multiprocessing.Pool(processes=10)
ret = p.map_async(mpFunctionReportError, argList)
# Wait until error or done
ret.wait()
# Queue size
qLen = q.qsize()
p.close()
# List for the errors
bugList = {}
# Loop the queue
for i in range(qLen):
# Pop a value
returnVal = q.get()
# Check for the error code
if returnVal["error"] is not None:
bugList[returnVal["index"]] = returnVal
else:
resultVector[returnVal["index"]] = returnVal["result"]
# Print the list of errors
if bugList:
print "-"*70
print "Some parts of the parallel execution broke down. Error list:"
print "-"*70
for i in bugList:
print "Index :",bugList[i]["index"]
print "Error code :",bugList[i]["error"]
print "Traceback :",bugList[i]["traceback"]
print "Args :",bugList[i]["args"]
print "-"*70
sys.exit(0)
else:
for p in range(Np):
resultVector[i] = func(argList[p])
for i in range(Np):
print "Index", i, "gives",resultVector[i]
当“runParallel = True”和“problemIndex = 4”时,我们现在有了完整的追踪信息。
----------------------------------------------------------------------
Some parts of the parallel execution broke down. Error list:
----------------------------------------------------------------------
Index : 4
Error code : integer division or modulo by zero
Traceback : Traceback (most recent call last):
File "fix3.py", line 44, in mpFunctionReportError
rslt["result"] = func(args)
File "fix3.py", line 26, in func
ret = 1/(args["index"]-args["problemIndex"])
ZeroDivisionError: integer division or modulo by zero
Args : {'Np': 10, 'index': 4, 'problemIndex': 4, 'q': <AutoProxy[Queue] object, typeid 'Queue' at 0xb708710c>, 'runParallel': True}
这段代码虽然不是特别优雅,但你觉得这样怎么样:
def func(args):
try:
# Emulate that the function might be fast or slow
time.sleep(random.randint(1,4))
ret = args["Np"] - args["index"]
# Emulate a bug
if args["index"]==args["problemIndex"]:
ret = 1/(args["index"]-args["problemIndex"])
# Return data
if args["runParallel"]:
# We use a queue thus ordering may not be protected
args["q"].put((args["index"],ret))
else:
return ret
except Exception as e:
logging.exception(e)
raise
输出的结果应该是这样的(当问题索引为9时):
ERROR:root:integer division or modulo by zero
Traceback (most recent call last):
File "/home/rciorba/test.py", line 26, in func
ret = 1/(args["index"]-args["problemIndex"])
ZeroDivisionError: integer division or modulo by zero
Bummer - one of more worker threads broke down 10 9
我发现 traceback 模块在多进程调试中非常有用。如果你把一个错误信息传回主线程或主进程,你会失去所有的错误追踪信息。所以你需要在子线程中调用 traceback.format_exc
,然后把这个文本和错误信息一起传回主线程。下面我提供了一个可以与 Pool 一起使用的模式。
import traceback
import multiprocessing as mp
import time
def mpFunctionReportError(kwargs):
'''
wrap any function and catch any errors from f,
putting them in pipe instead of raising
kwargs must contain 'queue' (multiprocessing queue)
and 'f' function to be run
'''
queue = kwargs.pop('queue')
f = kwargs.pop('f')
rslt=None
try:
rslt = f(**kwargs)
queue.put(rslt)
except Exception, e:
queue.put([e,traceback.format_exc(e)])
return
def doNothing(a):
return a
def raiseException(a):
a='argh'
raise ValueError('this is bad')
manager = mp.Manager()
outQ = manager.Queue()
p = mp.Pool(processes=4)
ret = p.map_async(mpFunctionReportError,iterable=[dict(f=doNothing,queue=outQ,a='pointless!') for i in xrange(4)])
ret.wait()
time.sleep(1)
for i in xrange(4):
print(outQ.get_nowait())
ret = p.map_async(mpFunctionReportError,iterable=[dict(f=raiseException,queue=outQ,a='pointless!') for i in xrange(2)])
ret.wait()
time.sleep(1)
for i in xrange(2):
e,trace = outQ.get_nowait()
print(e)
print(trace)
运行这个例子会得到:
pointless!
pointless!
pointless!
pointless!
this is bad
Traceback (most recent call last):
File "/home/john/projects/mpDemo.py", line 13, in mpFunctionReportError
rslt = f(**kwargs)
File "/home/john/projects/mpDemo.py", line 24, in raiseException
raise ValueError('this is bad')
ValueError: this is bad
this is bad
Traceback (most recent call last):
File "/home/john/projects/mpDemo.py", line 13, in mpFunctionReportError
rslt = f(**kwargs)
File "/home/john/projects/mpDemo.py", line 24, in raiseException
raise ValueError('this is bad')
ValueError: this is bad