传达队列结束

17 投票
6 回答
12710 浏览
提问于 2025-04-16 03:30

我正在学习使用队列模块,但对如何让队列的消费者线程知道队列已经完成有点困惑。理想情况下,我希望在消费者线程中使用 get() 方法,如果队列被标记为“完成”,它能抛出一个异常。有没有比在队列最后加一个特殊值来标记最后一项更好的方法来传达这个信息?

6 个回答

8

哨兵(sentinel)是一种自然的方式来关闭队列,但有几个地方需要注意。

首先,要记住你可能有多个消费者(也就是处理队列数据的部分),所以你需要为每个正在运行的消费者发送一个哨兵,并确保每个消费者只消费一个哨兵,这样才能确保每个消费者都能收到它的关闭信号。

其次,要明白队列(Queue)定义了一个接口,代码应该尽量不依赖于具体的队列实现。你可能在使用一个优先队列(PriorityQueue),或者其他某种类,它们虽然都实现了相同的接口,但返回的值顺序可能不同。

不幸的是,处理这两种情况都比较困难。为了应对不同队列的一般情况,正在关闭的消费者必须在收到关闭哨兵后继续消费值,直到队列为空。这意味着它可能会消费到其他线程的哨兵。这是队列接口的一个弱点:它应该有一个Queue.shutdown的调用,让所有消费者抛出异常,但这个功能是缺失的。

所以,实际上:

  • 如果你确定只使用普通队列,那么每个线程只需发送一个哨兵。
  • 如果你可能在使用优先队列,确保哨兵的优先级最低。
9

队列本身并没有“完成”或“结束”的概念。它们可以无限期地使用。当你用完队列时,确实需要在最后放一个 None 或其他特殊值,并编写逻辑来检查这个值,就像你所描述的那样。理想的做法可能是创建一个队列的子类。

想了解更多关于队列的知识,可以查看 这个链接

13

原文(大部分内容已更改;请参见下面的更新)

根据一些建议(谢谢!)来自Glenn Maynard和其他人,我决定创建一个继承自Queue.Queue的类,里面实现了一个close方法。这个类以一个简单的(未打包的)模块的形式提供。等我有更多时间时,会把它整理得更好并打包。目前这个模块只包含CloseableQueue类和Closed异常类。我计划扩展它,增加Queue.LifoQueueQueue.PriorityQueue的子类。

目前这个模块还处于初步阶段,虽然它通过了测试,但我还没有实际用它做过什么。你的使用体验可能会有所不同。我会持续更新这个回答,带来一些激动人心的消息。

CloseableQueue类和Glenn的建议有点不同,关闭队列后将不再允许后续的put操作,但在队列清空之前,仍然可以进行get操作。我觉得这样设计最合理;似乎可以将清空队列的功能作为一个独立的混合功能添加,这样就不会影响关闭功能。所以基本上,通过关闭队列,你表示最后一个元素已经被put进来了。你也可以通过在最后一次put调用时传入last=True来实现原子操作。之后的put调用,以及在队列清空后进行的get调用,都会抛出Closed异常。

这个功能主要适用于单个生产者为一个或多个消费者生成数据的场景,但在多个消费者等待特定项目或一组项目的情况下也可能有用。特别是,它并没有提供一种方法来判断所有生产者是否都完成了生产。要实现这一点,需要提供某种方式来注册生产者(比如.open()?),以及一种方式来表示生产者注册已经关闭。

欢迎提出建议和代码审查。我并没有写很多并发代码,但希望测试套件足够全面,代码通过测试可以说明它的质量,而不是测试套件本身的问题。我能够重用队列模块测试套件中的一部分代码:这个文件本身包含在这个模块中,并作为各种子类和例程的基础,包括回归测试。这可能(希望)有助于避免在测试方面的完全无能。代码本身只是对Queue.getQueue.put进行了相对较小的修改,并添加了closeclosed方法。

我有意避免在代码和测试套件中使用任何新奇的东西,比如上下文管理器,以保持代码与队列模块的向后兼容性,而队列模块本身确实是相当向后兼容的。我可能会在某个时候添加__enter____exit__方法;否则,contextlib的closing函数应该适用于CloseableQueue实例。

*:这里我宽松地使用“混合”这个词。由于Queue模块的类是旧式的,混合类需要使用类工厂函数来混合;有一些限制;在Guido禁止的地方提供无效。

更新

CloseableQueue模块现在提供了CloseableLifoQueueCloseablePriorityQueue类。我还添加了一些便利函数来支持迭代。仍然需要将其重新整理为一个合适的包。现在有一个类工厂函数,可以方便地对其他Queue.Queue派生类进行子类化。

更新 2

CloseableQueue现在可以通过PyPI获取,例如使用

$ easy_install CloseableQueue

欢迎评论和批评,特别是来自这个回答的匿名反对者。

撰写回答