在concurrent.futures代码中发生死锁
我一直在尝试用 concurrent.futures.ProcessPoolExecutor
来并行处理一些代码,但总是遇到奇怪的死锁问题,而用 ThreadPoolExecutor
就没有这个问题。这里有个简单的例子:
from concurrent import futures
def test():
pass
with futures.ProcessPoolExecutor(4) as executor:
for i in range(100):
print('submitting {}'.format(i))
executor.submit(test)
在 Python 3.2.2(64位的 Ubuntu 系统)上,这段代码在提交所有任务后似乎总是会卡住——而且每当提交的任务数量超过工作进程的数量时,就会发生这种情况。如果我把 ProcessPoolExecutor
换成 ThreadPoolExecutor
,那就能顺利运行。
为了调查这个问题,我给每个未来对象加了一个回调函数,用来打印 i
的值:
from concurrent import futures
def test():
pass
with futures.ProcessPoolExecutor(4) as executor:
for i in range(100):
print('submitting {}'.format(i))
future = executor.submit(test)
def callback(f):
print('callback {}'.format(i))
future.add_done_callback(callback)
这让我更加困惑——回调函数中打印的 i
的值是调用时的值,而不是定义时的值(所以我从来没看到过 callback 0
,但却收到了很多 callback 99
)。同样,ThreadPoolExecutor
打印出的值是我预期的。
我在想这可能是个 bug,于是尝试了一个最近的开发版本的 Python。现在,代码至少看起来能结束了,但我仍然打印出了错误的 i
值。
所以,有人能解释一下吗:
在 Python 3.2 和当前开发版本之间,
ProcessPoolExecutor
发生了什么,导致这个死锁问题被修复了为什么打印出的
i
值是“错误的”
补充说明:正如 jukiewicz 在下面指出的,当然打印 i
会打印出回调被调用时的值,我不知道我当时在想什么……如果我传递一个可调用对象,并把 i
的值作为它的一个属性,那就能按预期工作。
补充信息:所有的回调函数都被执行了,所以看起来是 executor.shutdown
(由 executor.__exit__
调用)无法判断进程是否已经完成。这在当前的 Python 3.3 中似乎完全修复了,但 multiprocessing
和 concurrent.futures
似乎进行了很多更改,所以我不知道是什么修复了这个问题。由于我不能使用 3.3(它似乎与 numpy 的发布版或开发版不兼容),我尝试把它的 multiprocessing 和 concurrent 包复制到我的 3.2 安装中,这似乎也能正常工作。不过,似乎有点奇怪的是——就我所见,ProcessPoolExecutor
在最新的发布版本中完全坏掉了,但没有其他人受到影响。
1 个回答
我修改了代码,解决了两个问题。callback
函数被定义为一个闭包,这样每次调用时都会使用更新后的 i
的值。至于死锁,这可能是因为在所有任务完成之前就关闭了执行器。等待所有任务完成也能解决这个问题。
from concurrent import futures
def test(i):
return i
def callback(f):
print('callback {}'.format(f.result()))
with futures.ProcessPoolExecutor(4) as executor:
fs = []
for i in range(100):
print('submitting {}'.format(i))
future = executor.submit(test, i)
future.add_done_callback(callback)
fs.append(future)
for _ in futures.as_completed(fs): pass
更新:哦,抱歉,我没有看到你的更新,这个问题似乎已经解决了。