Python 2.7.6中多进程下Queue.PriorityQueue的奇怪行为

16 投票
1 回答
7454 浏览
提问于 2025-04-18 17:23

正如标题所说,我正在尝试在多进程中使用优先队列。更具体地说,我想创建一个共享的优先队列,写了一些代码,但运行的结果并不是我预期的那样。

看看这段代码:

import time
from multiprocessing import Process, Lock
from Queue import PriorityQueue


def worker(queue):
    lock = Lock()
    with lock:
        for i in range(100):
            queue.put(i)

    print "worker", queue.qsize()


pr_queue = PriorityQueue()
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()

time.sleep(5)    # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()

我得到了以下输出:

worker 100
main 0

这是怎么回事?我该如何正确地实现我想要的功能呢?谢谢。

1 个回答

30

问题并不是说在这种情况下不能进行序列化(picklable)——如果你使用的是类Unix系统,队列可以直接传递给子进程,而不需要序列化。(在Windows上,这里可能会出现序列化错误。)根本问题在于你没有使用安全的进程间队列。只有在 multiprocessing 模块中的 Queue 对象 才能在不同的进程之间使用。不幸的是,目前没有现成的 PriorityQueue 实现。不过,你可以通过注册一个 PriorityQueuemultiprocessing.Manager 类,轻松创建一个,像这样:

import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from Queue import PriorityQueue


class MyManager(SyncManager):
    pass
MyManager.register("PriorityQueue", PriorityQueue)  # Register a shared PriorityQueue

def Manager():
    m = MyManager()
    m.start()
    return m

def worker(queue):
    print(queue)
    for i in range(100):
        queue.put(i)
    print "worker", queue.qsize()


m = Manager()
pr_queue = m.PriorityQueue()  # This is process-safe
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()

time.sleep(5)    # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()

输出:

worker 100
main 100

需要注意的是,这种方式的性能可能没有标准的 multiprocessing.Queue 子类那么好。基于 ManagerPriorityQueue 是通过创建一个 Manager 服务器进程来实现的,这个进程实际上包含一个普通的 PriorityQueue,然后为你的主进程和工作进程提供使用 IPC(进程间通信)来读写队列的 Proxy 对象。普通的 multiprocessing.Queue 只是直接通过 Pipe 来读写数据。如果你对此有顾虑,可以尝试通过继承或委托 multiprocessing.Queue 来实现自己的 multiprocessing.PriorityQueue。不过,这样做可能不太值得。

撰写回答