如何检查任务是否已在Python队列中?

21 投票
13 回答
14278 浏览
提问于 2025-04-15 15:08

我正在用Python写一个简单的爬虫,使用了线程和队列模块。我会抓取一个网页,检查里面的链接,然后把这些链接放进一个队列里。当某个线程处理完一个页面后,它就会从队列里拿下一个页面来处理。我用一个数组来记录我已经访问过的页面,以此来过滤我放进队列的链接。但是如果有多个线程同时工作,它们可能会在不同的页面上抓到相同的链接,这样就会把重复的链接放进队列。那么,我该怎么判断一个链接是否已经在队列里,以避免重复放入呢?

13 个回答

3

接下来是对Lukáš Lalinský后来的一个解决方案的改进。重要的区别在于,put这个方法被重新定义了,这样可以确保unfinished_tasks的准确性,并且join能够按预期工作。

from queue import Queue

class UniqueQueue(Queue):

    def _init(self, maxsize):
        self.all_items = set()
        Queue._init(self, maxsize)

    def put(self, item, block=True, timeout=None):
        if item not in self.all_items:
            self.all_items.add(item)
            Queue.put(self, item, block, timeout)
4

如果不重写put方法,调用join时会一直卡住,永远不会结束。你可以在这个链接查看相关代码:https://github.com/python/cpython/blob/master/Lib/queue.py#L147

class UniqueQueue(Queue):

    def put(self, item, block=True, timeout=None):
        if item not in self.queue: # fix join bug
            Queue.put(self, item, block, timeout)

    def _init(self, maxsize):
        self.queue = set()

    def _put(self, item):
        self.queue.add(item)

    def _get(self):
        return self.queue.pop()
22

如果你不在意处理项目的顺序,我建议你试试一个使用了setQueue子类:

class SetQueue(Queue):

    def _init(self, maxsize):
        self.maxsize = maxsize
        self.queue = set()

    def _put(self, item):
        self.queue.add(item)

    def _get(self):
        return self.queue.pop()

正如保罗·麦圭尔提到的,这样做可以在一个项目被移除后再次添加重复的项目,只要它还没有被加入到“已处理”的集合中。为了解决这个问题,你可以在Queue实例中同时存储这两个集合,但因为你使用较大的集合来检查项目是否已经处理过,所以你也可以直接回到queue,这样可以正确地排列请求。

class SetQueue(Queue):

    def _init(self, maxsize):
        Queue._init(self, maxsize) 
        self.all_items = set()

    def _put(self, item):
        if item not in self.all_items:
            Queue._put(self, item) 
            self.all_items.add(item)

这样做的好处是,与单独使用一个集合相比,Queue的方法是线程安全的,这样你就不需要额外的锁来检查另一个集合。

撰写回答