Python multiprocessing.Queue 在 put 和 get 时死锁
我在这段代码中遇到了死锁问题:
def _entropy_split_parallel(data_train, answers_train, weights):
CPUS = 1 #multiprocessing.cpu_count()
NUMBER_TASKS = len(data_train[0])
processes = []
multi_list = zip(data_train, answers_train, weights)
task_queue = multiprocessing.Queue()
done_queue = multiprocessing.Queue()
for feature_index in xrange(NUMBER_TASKS):
task_queue.put(feature_index)
for i in xrange(CPUS):
process = multiprocessing.Process(target=_worker,
args=(multi_list, task_queue, done_queue))
processes.append(process)
process.start()
min_entropy = None
best_feature = None
best_split = None
for i in xrange(NUMBER_TASKS):
entropy, feature, split = done_queue.get()
if (entropy < min_entropy or min_entropy == None) and entropy != None:
best_feature = feature
best_split = split
for i in xrange(CPUS):
task_queue.put('STOP')
for process in processes:
process.join()
return best_feature, best_split
def _worker(multi_list, task_queue, done_queue):
feature_index = task_queue.get()
while feature_index != 'STOP':
result = _entropy_split3(multi_list, feature_index)
done_queue.put(result)
feature_index = task_queue.get()
当我运行我的程序时,它在多次调用 _entropy_split_parallel
的时候都能正常工作,但最终会出现死锁。父进程在 done_queue.get()
这行代码上被阻塞,而工作进程则在 done_queue.put()
上被阻塞。因为在这种情况下队列总是空的,所以在 get
上被阻塞是可以理解的。但我不明白的是,为什么工作进程在 put
上也会被阻塞,因为队列显然是空的(根本没有东西)。我尝试过使用 block
和 timeout
这些参数,但结果还是一样。
我使用的是 multiprocessing 的回溯版本,因为我只能用 Python 2.5。
补充:看起来我在使用 multiprocessing 模块提供的一个示例时也遇到了死锁问题。这个示例是从底部往上数第三个 在这里。 只有当我多次调用测试方法时,才会出现死锁。例如,把脚本底部改成这样:
if __name__ == '__main__':
freeze_support()
for x in xrange(1000):
test()
补充:我知道这是个老问题,但测试显示在 Windows 上使用 Python 2.7 时,这个问题不再存在。我会尝试在 Linux 上运行并反馈结果。
2 个回答
我觉得问题出在父线程试图等待一个它传递了队列的子线程。这在多进程模块的编程指南部分中有讨论。
无论如何,我遇到了和你描述的症状一样的问题,当我调整我的逻辑,让主线程不去等待子线程时,就没有出现死锁。我的调整逻辑是先确定我需要从结果或“完成”队列中获取多少个项目(这个数量可以根据子线程的数量或工作队列中的项目数量等来预测),然后不断循环,直到所有这些项目都收集完毕。
下面是这个逻辑的“玩具”示例:
num_items_expected = figure_it_out(work_queue, num_threads)
items_received = []
while len(items_received) < num_items_expected:
items_received.append(done_queue.get())
time.sleep(5)
这个逻辑避免了父线程去等待子线程的需要,同时也让父线程在所有子线程完成之前保持阻塞。这种方法解决了我的死锁问题。
这个问题在更新后的Python版本中解决了,所以我猜之前是旧版本的问题。总之,现在已经不成问题了。