如何在多个Python多进程队列中使用"select"?

33 投票
10 回答
22161 浏览
提问于 2025-04-15 12:53

在同一台电脑上,有两个(多进程的)队列,你想要等着某个东西变得可用,但又不想让程序一直在那儿转圈圈。有什么好的方法吗?

10 个回答

4

我不太确定在Windows上使用多进程队列的选择功能效果如何。因为在Windows上,选择功能是用来监听网络连接的,而不是文件句柄,所以我怀疑可能会出现一些问题。

我的解决办法是为每个队列创建一个线程,让它们在阻塞状态下监听,然后把结果放到一个主线程监听的单一队列里,实际上就是把多个队列合并成一个。

我用来实现这个方法的代码是:

"""
Allow multiple queues to be waited upon.

queue,value = multiq.select(list_of_queues)
"""
import queue
import threading

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        while True:
            data = self.inq.get()
            print ("thread reads data=",data)
            result = (self.inq,data)
            self.sharedq.put(result)

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

下面的测试程序展示了如何使用它:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)

res=0
while not res==4:
    while not q3.empty():
        res = q3.get()[1]
        print ("returning result =",res)

希望这能帮到你。

Tony Wallace

33

其实你可以在select.select中使用multiprocessing.Queue对象。也就是说,

que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])

只有当队列准备好可以读取时,它才会被选中。

不过这方面没有什么文档说明。我是通过查看multiprocessing.queue库的源代码来发现这个的(在Linux系统中,通常路径是/usr/lib/python2.6/multiprocessing/queue.py)。

使用Queue.Queue时,我没有找到任何聪明的方法来做到这一点(我真的很想找到)。

15

目前看起来还没有官方的解决办法。或者说,至少根据这个链接来看是这样的:

你可以试试这个帖子里提到的方法——访问底层的管道文件句柄:

然后再使用选择(select)。

撰写回答