从线程队列中获取所有项

20 投票
6 回答
42501 浏览
提问于 2025-04-11 09:20

我有一个线程负责把结果写入一个队列。

在另一个线程(图形界面)中,我会定期(在空闲事件中)检查队列里有没有结果,像这样:

def queue_get_all(q):
    items = []
    while 1:
        try:
            items.append(q.get_nowait())
        except Empty, e:
            break
    return items

这样做好吗?

补充说明:

我之所以问这个,是因为有时候等待的线程会卡住几秒钟,没法取出新的结果。

这个“卡住”的问题其实是因为我在空闲事件处理程序中进行处理,但没有确保这些事件是通过调用 wx.WakeUpIdle 生成的,正如推荐的那样。

6 个回答

19

我觉得从队列中取出所有项目最简单的方法就是这样:

def get_all_queue_result(queue):

    result_list = []
    while not queue.empty():
        result_list.append(queue.get())

    return result_list
23

如果你总是从队列中取出所有可用的项目,那使用队列还有什么实际意义呢?不如直接用一个带锁的列表来处理更简单,对吧?

from __future__ import with_statement
import threading

class ItemStore(object):
    def __init__(self):
        self.lock = threading.Lock()
        self.items = []

    def add(self, item):
        with self.lock:
            self.items.append(item)

    def getAll(self):
        with self.lock:
            items, self.items = self.items, []
        return items

如果你是一个一个地取出项目,并且利用了空队列时的阻塞特性,那就应该使用队列。不过从你的情况来看,似乎更简单的方法可能更适合你。

[编辑2] 我之前没注意到你是从一个空闲的循环中轮询队列,而且从你的更新中我看到,问题和竞争无关,所以下面的方法对你的问题并不太适用。我把它留着,以防有人觉得这个阻塞的变体有用:

如果你确实想要在得到至少一个结果之前阻塞,可以修改上面的代码,让它等待数据通过生产者线程的信号变得可用。例如:

class ItemStore(object):
    def __init__(self):
        self.cond = threading.Condition()
        self.items = []

    def add(self, item):
        with self.cond:
            self.items.append(item)
            self.cond.notify() # Wake 1 thread waiting on cond (if any)

    def getAll(self, blocking=False):
        with self.cond:
            # If blocking is true, always return at least 1 item
            while blocking and len(self.items) == 0:
                self.cond.wait()
            items, self.items = self.items, []
        return items
9

如果 get_nowait() 这个调用因为列表为空而不返回导致暂停,我会感到非常惊讶。

可能是你在检查之间发送了很多(也许是比较大的?)项目,这样接收线程就需要从 Queue 中提取大量数据?你可以试着限制每次获取的数量:

def queue_get_all(q):
    items = []
    maxItemsToRetrieve = 10
    for numOfItemsRetrieved in range(0, maxItemsToRetrieve):
        try:
            if numOfItemsRetrieved == maxItemsToRetrieve:
                break
            items.append(q.get_nowait())
        except Empty, e:
            break
    return items

这样可以限制接收线程每次最多提取10个项目。

撰写回答