多进程队列最大尺寸限制为32767
我正在尝试用Python 2.6(在OSX系统上)写一个程序,想要使用多进程功能,并且想在队列中放入超过默认的32767个项目。
from multiprocessing import Queue
Queue(2**15) # raises OSError
Queue(32767)
这个写法没问题,但如果我试着用更大的数字,比如 Queue(32768)
,就会出现 OSError: [Errno 22] Invalid argument
的错误。
有没有什么办法可以解决这个问题呢?
2 个回答
我之前已经回答过原始问题了,但我想补充一下,Redis 的列表功能非常可靠,而且它的 Python 模块使用起来也很简单,特别适合用来实现类似队列的对象。这种方式的好处是可以在多个节点(跨网络)上扩展,而不仅仅是在多个进程中。
基本上,你只需要为你的队列选择一个名字(字符串),然后让生产者把数据推送到这个队列里,工作者(任务消费者)则循环从这个队列中取出数据。
Redis 的 BLPOP 和 BRPOP 命令都需要一个键(列表/队列)的列表和一个可选的超时时间。它们会返回一个元组(键,值)或者在超时的情况下返回 None。所以你可以很容易地写出一个事件驱动的系统,这个系统的结构和你熟悉的 select() 很相似(但层次更高)。你唯一需要注意的是缺失的键和无效的键类型(当然,要把你的队列操作包裹在异常处理器里)。如果有其他应用在你的共享 Redis 服务器上停止了,删除了你正在使用的键,或者把你用作队列的键替换成字符串/整数或其他类型的值……那你就遇到其他问题了。 :)
这个模型的另一个优点是,Redis 会把数据持久化到磁盘上。所以如果你选择这样做,你的工作队列在系统重启后也能继续存在。
(当然,如果你真的想的话,也可以在 SQLlite 或其他 SQL 系统中实现一个简单的队列;只需使用某种自增索引来进行排序,并用一列来标记每个项目是否“完成”(被消费);但这样做的复杂性会比直接使用 Redis 提供的功能要高一些)。
一种方法是用一个自定义的类来包装你的 multiprocessing.Queue
(只在生产者那一侧,或者从消费者的角度来看是透明的)。这样,你可以把要发送的项目放到你包装的 Queue
对象里,并且只有在有空位的时候,才把本地队列(Python 的 list()
对象)里的东西放进 multiprocessing.Queue
。同时,你需要处理一些异常,以防 Queue
满了时进行限制。
这可能是最简单的方法,因为它对你其他代码的影响最小。这个自定义类的行为应该和队列一样,但它会把底层的 multiprocessing.Queue
隐藏在你的抽象后面。
(一种方法可能是让你的生产者使用线程,一个线程负责从一个线程安全的 Queue
管理发送到你的 multiprocessing.Queue
,而其他线程则只是往这个线程安全的 Queue
里放东西。)