如何在多个Python多进程队列中使用"select"?
在同一台电脑上,有两个(多进程的)队列,你想要等着某个东西变得可用,但又不想让程序一直在那儿转圈圈。有什么好的方法吗?
10 个回答
4
我不太确定在Windows上使用多进程队列的选择功能效果如何。因为在Windows上,选择功能是用来监听网络连接的,而不是文件句柄,所以我怀疑可能会出现一些问题。
我的解决办法是为每个队列创建一个线程,让它们在阻塞状态下监听,然后把结果放到一个主线程监听的单一队列里,实际上就是把多个队列合并成一个。
我用来实现这个方法的代码是:
"""
Allow multiple queues to be waited upon.
queue,value = multiq.select(list_of_queues)
"""
import queue
import threading
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
while True:
data = self.inq.get()
print ("thread reads data=",data)
result = (self.inq,data)
self.sharedq.put(result)
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
下面的测试程序展示了如何使用它:
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
res=0
while not res==4:
while not q3.empty():
res = q3.get()[1]
print ("returning result =",res)
希望这能帮到你。
Tony Wallace
33
其实你可以在select.select中使用multiprocessing.Queue对象。也就是说,
que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])
只有当队列准备好可以读取时,它才会被选中。
不过这方面没有什么文档说明。我是通过查看multiprocessing.queue库的源代码来发现这个的(在Linux系统中,通常路径是/usr/lib/python2.6/multiprocessing/queue.py)。
使用Queue.Queue时,我没有找到任何聪明的方法来做到这一点(我真的很想找到)。
15
目前看起来还没有官方的解决办法。或者说,至少根据这个链接来看是这样的:
你可以试试这个帖子里提到的方法——访问底层的管道文件句柄:
然后再使用选择(select)。