Python multiprocessing.Queue 在 put 和 get 时死锁

6 投票
2 回答
10644 浏览
提问于 2025-04-16 00:27

我在这段代码中遇到了死锁问题:


def _entropy_split_parallel(data_train, answers_train, weights):
    CPUS = 1 #multiprocessing.cpu_count()
    NUMBER_TASKS = len(data_train[0])
    processes = []

    multi_list = zip(data_train, answers_train, weights)

    task_queue = multiprocessing.Queue()
    done_queue = multiprocessing.Queue()

    for feature_index in xrange(NUMBER_TASKS):
        task_queue.put(feature_index)

    for i in xrange(CPUS):
        process = multiprocessing.Process(target=_worker, 
                args=(multi_list, task_queue, done_queue))
        processes.append(process)
        process.start()

    min_entropy = None
    best_feature = None
    best_split = None
    for i in xrange(NUMBER_TASKS):
        entropy, feature, split = done_queue.get()
        if (entropy < min_entropy or min_entropy == None) and entropy != None:
            best_feature = feature
            best_split = split

    for i in xrange(CPUS):
        task_queue.put('STOP')

    for process in processes:
        process.join()

    return best_feature, best_split


def _worker(multi_list, task_queue, done_queue):
    feature_index = task_queue.get()
    while feature_index != 'STOP':
        result = _entropy_split3(multi_list, feature_index)
        done_queue.put(result)
        feature_index = task_queue.get()

当我运行我的程序时,它在多次调用 _entropy_split_parallel 的时候都能正常工作,但最终会出现死锁。父进程在 done_queue.get() 这行代码上被阻塞,而工作进程则在 done_queue.put() 上被阻塞。因为在这种情况下队列总是空的,所以在 get 上被阻塞是可以理解的。但我不明白的是,为什么工作进程在 put 上也会被阻塞,因为队列显然是空的(根本没有东西)。我尝试过使用 blocktimeout 这些参数,但结果还是一样。

我使用的是 multiprocessing 的回溯版本,因为我只能用 Python 2.5。


补充:看起来我在使用 multiprocessing 模块提供的一个示例时也遇到了死锁问题。这个示例是从底部往上数第三个 在这里。 只有当我多次调用测试方法时,才会出现死锁。例如,把脚本底部改成这样:


if __name__ == '__main__':
    freeze_support()
    for x in xrange(1000):
        test()

补充:我知道这是个老问题,但测试显示在 Windows 上使用 Python 2.7 时,这个问题不再存在。我会尝试在 Linux 上运行并反馈结果。

2 个回答

4

我觉得问题出在父线程试图等待一个它传递了队列的子线程。这在多进程模块的编程指南部分中有讨论。

无论如何,我遇到了和你描述的症状一样的问题,当我调整我的逻辑,让主线程不去等待子线程时,就没有出现死锁。我的调整逻辑是先确定我需要从结果或“完成”队列中获取多少个项目(这个数量可以根据子线程的数量或工作队列中的项目数量等来预测),然后不断循环,直到所有这些项目都收集完毕。

下面是这个逻辑的“玩具”示例:

num_items_expected = figure_it_out(work_queue, num_threads)
items_received = []
while len(items_received) < num_items_expected:
    items_received.append(done_queue.get())
    time.sleep(5)

这个逻辑避免了父线程去等待子线程的需要,同时也让父线程在所有子线程完成之前保持阻塞。这种方法解决了我的死锁问题。

0

这个问题在更新后的Python版本中解决了,所以我猜之前是旧版本的问题。总之,现在已经不成问题了。

撰写回答