在concurrent.futures代码中发生死锁

6 投票
1 回答
6085 浏览
提问于 2025-04-17 12:41

我一直在尝试用 concurrent.futures.ProcessPoolExecutor 来并行处理一些代码,但总是遇到奇怪的死锁问题,而用 ThreadPoolExecutor 就没有这个问题。这里有个简单的例子:

from concurrent import futures

def test():
    pass

with futures.ProcessPoolExecutor(4) as executor:
    for i in range(100):
        print('submitting {}'.format(i))
        executor.submit(test)

在 Python 3.2.2(64位的 Ubuntu 系统)上,这段代码在提交所有任务后似乎总是会卡住——而且每当提交的任务数量超过工作进程的数量时,就会发生这种情况。如果我把 ProcessPoolExecutor 换成 ThreadPoolExecutor,那就能顺利运行。

为了调查这个问题,我给每个未来对象加了一个回调函数,用来打印 i 的值:

from concurrent import futures

def test():
    pass

with futures.ProcessPoolExecutor(4) as executor:
    for i in range(100):
        print('submitting {}'.format(i))
        future = executor.submit(test)

        def callback(f):
            print('callback {}'.format(i))
        future.add_done_callback(callback)

这让我更加困惑——回调函数中打印的 i 的值是调用时的值,而不是定义时的值(所以我从来没看到过 callback 0,但却收到了很多 callback 99)。同样,ThreadPoolExecutor 打印出的值是我预期的。

我在想这可能是个 bug,于是尝试了一个最近的开发版本的 Python。现在,代码至少看起来能结束了,但我仍然打印出了错误的 i 值。

所以,有人能解释一下吗:

  • 在 Python 3.2 和当前开发版本之间,ProcessPoolExecutor 发生了什么,导致这个死锁问题被修复了

  • 为什么打印出的 i 值是“错误的”

补充说明:正如 jukiewicz 在下面指出的,当然打印 i 会打印出回调被调用时的值,我不知道我当时在想什么……如果我传递一个可调用对象,并把 i 的值作为它的一个属性,那就能按预期工作。

补充信息:所有的回调函数都被执行了,所以看起来是 executor.shutdown(由 executor.__exit__ 调用)无法判断进程是否已经完成。这在当前的 Python 3.3 中似乎完全修复了,但 multiprocessingconcurrent.futures 似乎进行了很多更改,所以我不知道是什么修复了这个问题。由于我不能使用 3.3(它似乎与 numpy 的发布版或开发版不兼容),我尝试把它的 multiprocessing 和 concurrent 包复制到我的 3.2 安装中,这似乎也能正常工作。不过,似乎有点奇怪的是——就我所见,ProcessPoolExecutor 在最新的发布版本中完全坏掉了,但没有其他人受到影响。

1 个回答

3

我修改了代码,解决了两个问题。callback 函数被定义为一个闭包,这样每次调用时都会使用更新后的 i 的值。至于死锁,这可能是因为在所有任务完成之前就关闭了执行器。等待所有任务完成也能解决这个问题。

from concurrent import futures

def test(i):
    return i

def callback(f):
    print('callback {}'.format(f.result()))


with futures.ProcessPoolExecutor(4) as executor:
    fs = []
    for i in range(100):
        print('submitting {}'.format(i))
        future = executor.submit(test, i)
        future.add_done_callback(callback)
        fs.append(future)

    for _ in futures.as_completed(fs): pass

更新:哦,抱歉,我没有看到你的更新,这个问题似乎已经解决了。

撰写回答