多进程队列最大尺寸限制为32767

14 投票
2 回答
16321 浏览
提问于 2025-04-16 17:03

我正在尝试用Python 2.6(在OSX系统上)写一个程序,想要使用多进程功能,并且想在队列中放入超过默认的32767个项目。

from multiprocessing import Queue
Queue(2**15) # raises OSError

Queue(32767) 这个写法没问题,但如果我试着用更大的数字,比如 Queue(32768),就会出现 OSError: [Errno 22] Invalid argument 的错误。

有没有什么办法可以解决这个问题呢?

2 个回答

2

我之前已经回答过原始问题了,但我想补充一下,Redis 的列表功能非常可靠,而且它的 Python 模块使用起来也很简单,特别适合用来实现类似队列的对象。这种方式的好处是可以在多个节点(跨网络)上扩展,而不仅仅是在多个进程中。

基本上,你只需要为你的队列选择一个名字(字符串),然后让生产者把数据推送到这个队列里,工作者(任务消费者)则循环从这个队列中取出数据。

Redis 的 BLPOP 和 BRPOP 命令都需要一个键(列表/队列)的列表和一个可选的超时时间。它们会返回一个元组(键,值)或者在超时的情况下返回 None。所以你可以很容易地写出一个事件驱动的系统,这个系统的结构和你熟悉的 select() 很相似(但层次更高)。你唯一需要注意的是缺失的键和无效的键类型(当然,要把你的队列操作包裹在异常处理器里)。如果有其他应用在你的共享 Redis 服务器上停止了,删除了你正在使用的键,或者把你用作队列的键替换成字符串/整数或其他类型的值……那你就遇到其他问题了。 :)

这个模型的另一个优点是,Redis 会把数据持久化到磁盘上。所以如果你选择这样做,你的工作队列在系统重启后也能继续存在。

(当然,如果你真的想的话,也可以在 SQLlite 或其他 SQL 系统中实现一个简单的队列;只需使用某种自增索引来进行排序,并用一列来标记每个项目是否“完成”(被消费);但这样做的复杂性会比直接使用 Redis 提供的功能要高一些)。

4

一种方法是用一个自定义的类来包装你的 multiprocessing.Queue(只在生产者那一侧,或者从消费者的角度来看是透明的)。这样,你可以把要发送的项目放到你包装的 Queue 对象里,并且只有在有空位的时候,才把本地队列(Python 的 list() 对象)里的东西放进 multiprocessing.Queue。同时,你需要处理一些异常,以防 Queue 满了时进行限制。

这可能是最简单的方法,因为它对你其他代码的影响最小。这个自定义类的行为应该和队列一样,但它会把底层的 multiprocessing.Queue 隐藏在你的抽象后面。

(一种方法可能是让你的生产者使用线程,一个线程负责从一个线程安全的 Queue 管理发送到你的 multiprocessing.Queue,而其他线程则只是往这个线程安全的 Queue 里放东西。)

撰写回答