Python multiprocessing apply_async在Windows 7上未返回结果
我正在尝试一个非常简单的多进程示例:
import multiprocessing as mp
def cube(x):
return x**3
pool = mp.Pool(processes=2)
results = [pool.apply_async(cube, args=x) for x in range(1,7)]
但是,在我的Windows电脑上,我无法得到结果(在Ubuntu 12.04LTS上运行得很好)。
如果我检查一下results
,我看到以下内容:
[<multiprocessing.pool.ApplyResult object at 0x01FF0910>,
<multiprocessing.pool.ApplyResult object at 0x01FF0950>,
<multiprocessing.pool.ApplyResult object at 0x01FF0990>,
<multiprocessing.pool.ApplyResult object at 0x01FF09D0>,
<multiprocessing.pool.ApplyResult object at 0x01FF0A10>,
<multiprocessing.pool.ApplyResult object at 0x01FF0A50>]
如果我运行results[0].ready()
,我总是得到False
。
如果我运行results[0].get()
,Python解释器就会卡住,等待结果,但结果就是得不到。
这个示例简单得不能再简单了,所以我在想这可能是和操作系统有关的低级错误(我用的是Windows 7)。不过,也许其他人有更好的想法?
2 个回答
我在Windows上使用Anaconda里的Spyder运行Python 3,但这个小技巧对我没用。我尝试了很多方法,但都没有效果。我找到了我问题的原因,和dano在他的笔记中提到的一样。不过经过一天的搜索,我找到了一些解决办法,帮助我在Windows机器上运行了相同的代码。这个网站给了我解决方案:
http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html
因为我使用的是Python 3,所以我稍微修改了程序,变成了这样:
from types import FunctionType
import marshal
def _applicable(*args, **kwargs):
name = kwargs['__pw_name']
code = marshal.loads(kwargs['__pw_code'])
gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
defs = marshal.loads(kwargs['__pw_defs'])
clsr = marshal.loads(kwargs['__pw_clsr'])
fdct = marshal.loads(kwargs['__pw_fdct'])
func = FunctionType(code, gbls, name, defs, clsr)
func.fdct = fdct
del kwargs['__pw_name']
del kwargs['__pw_code']
del kwargs['__pw_defs']
del kwargs['__pw_clsr']
del kwargs['__pw_fdct']
return func(*args, **kwargs)
def make_applicable(f, *args, **kwargs):
if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
kwargs['__pw_name'] = f.__name__ # edited
kwargs['__pw_code'] = marshal.dumps(f.__code__) # edited
kwargs['__pw_defs'] = marshal.dumps(f.__defaults__) # edited
kwargs['__pw_clsr'] = marshal.dumps(f.__closure__) # edited
kwargs['__pw_fdct'] = marshal.dumps(f.__dict__) # edited
return _applicable, args, kwargs
def _mappable(x):
x,name,code,defs,clsr,fdct = x
code = marshal.loads(code)
gbls = globals() #gbls = marshal.loads(gbls)
defs = marshal.loads(defs)
clsr = marshal.loads(clsr)
fdct = marshal.loads(fdct)
func = FunctionType(code, gbls, name, defs, clsr)
func.fdct = fdct
return func(x)
def make_mappable(f, iterable):
if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
name = f.__name__ # edited
code = marshal.dumps(f.__code__) # edited
defs = marshal.dumps(f.__defaults__) # edited
clsr = marshal.dumps(f.__closure__) # edited
fdct = marshal.dumps(f.__dict__) # edited
return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)
在这个函数之后,上面的代码也稍微改了一下,变成了这样:
from multiprocessing import Pool
from poolable import make_applicable, make_mappable
def cube(x):
return x**3
if __name__ == "__main__":
pool = Pool(processes=2)
results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
print([result.get(timeout=10) for result in results])
最后我得到了这样的输出:
[1, 8, 27, 64, 125, 216]
我觉得这篇帖子可能对一些Windows用户有帮助。
这里有几个错误。首先,在Windows系统上运行时,你必须把Pool
放在if __name__ == "__main__":
这个保护语句里面,具体可以参考这篇文档。其次,即使你只传一个参数,也必须把args
这个关键字参数传递一个序列。所以把这些结合起来:
import multiprocessing as mp
def cube(x):
return x**3
if __name__ == "__main__":
pool = mp.Pool(processes=2)
results = [pool.apply_async(cube, args=(x,)) for x in range(1,7)]
print([result.get() for result in results])
输出:
[1, 8, 27, 64, 125, 216]
补充:
哦,正如moarningsun提到的,multiprocessing
在交互式解释器中效果不好:
注意
这个包的功能要求
__main__
模块能够被子进程导入。这在编程指南中有说明,但在这里提一下也很重要。这意味着一些例子,比如multiprocessing.Pool
的例子,在交互式解释器中是无法工作的。
所以你需要把代码当作脚本来执行,才能正确测试它。