Python 2.7.6中多进程下Queue.PriorityQueue的奇怪行为
正如标题所说,我正在尝试在多进程中使用优先队列。更具体地说,我想创建一个共享的优先队列,写了一些代码,但运行的结果并不是我预期的那样。
看看这段代码:
import time
from multiprocessing import Process, Lock
from Queue import PriorityQueue
def worker(queue):
lock = Lock()
with lock:
for i in range(100):
queue.put(i)
print "worker", queue.qsize()
pr_queue = PriorityQueue()
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()
time.sleep(5) # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()
我得到了以下输出:
worker 100
main 0
这是怎么回事?我该如何正确地实现我想要的功能呢?谢谢。
1 个回答
30
问题并不是说在这种情况下不能进行序列化(picklable)——如果你使用的是类Unix系统,队列可以直接传递给子进程,而不需要序列化。(在Windows上,这里可能会出现序列化错误。)根本问题在于你没有使用安全的进程间队列。只有在 multiprocessing
模块中的 Queue
对象 才能在不同的进程之间使用。不幸的是,目前没有现成的 PriorityQueue
实现。不过,你可以通过注册一个 PriorityQueue
和 multiprocessing.Manager
类,轻松创建一个,像这样:
import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from Queue import PriorityQueue
class MyManager(SyncManager):
pass
MyManager.register("PriorityQueue", PriorityQueue) # Register a shared PriorityQueue
def Manager():
m = MyManager()
m.start()
return m
def worker(queue):
print(queue)
for i in range(100):
queue.put(i)
print "worker", queue.qsize()
m = Manager()
pr_queue = m.PriorityQueue() # This is process-safe
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()
time.sleep(5) # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()
输出:
worker 100
main 100
需要注意的是,这种方式的性能可能没有标准的 multiprocessing.Queue
子类那么好。基于 Manager
的 PriorityQueue
是通过创建一个 Manager
服务器进程来实现的,这个进程实际上包含一个普通的 PriorityQueue
,然后为你的主进程和工作进程提供使用 IPC(进程间通信)来读写队列的 Proxy
对象。普通的 multiprocessing.Queue
只是直接通过 Pipe
来读写数据。如果你对此有顾虑,可以尝试通过继承或委托 multiprocessing.Queue
来实现自己的 multiprocessing.PriorityQueue
。不过,这样做可能不太值得。