可迭代的多进程队列未退出
import multiprocessing.queues as queues
import multiprocessing
class I(queues.Queue):
def __init__(self, maxsize=0):
super(I, self).__init__(maxsize)
self.length = 0
def __iter__(self):
return self
def put(self, obj, block=True, timeout=None):
super(I, self).put(obj,block,timeout)
self.length += 1
def get(self, block = True, timeout = None):
self.length -= 1
return super(I, self).get(block, timeout)
def __len__(self):
return self.length
def next(self):
item = self.get()
if item == 'Done':
raise StopIteration
return item
def thisworker(item):
print 'got this item: %s' % item
return item
q=I()
q.put(1)
q.put('Done')
the_pool = multiprocessing.Pool(1)
print the_pool.map(thisworker, q)
我正在尝试创建一个可迭代的队列,用于与多进程池的映射功能一起使用。我的想法是,函数 thisworker
会不断往队列里添加一些项目,直到满足某个条件,然后在队列里放入“完成”这个标记后退出(不过在这段代码里我还没实现这一点)。
但是,这段代码总是无法完成,它总是卡住。
我无法找到真正的原因。希望能得到你的帮助。
附注:我使用了 self.length
,因为从 the_pool.map
调用的 map_async
方法需要用到可迭代对象的长度,以便形成一个变量 chunksize
,这个变量将用于从池中获取任务。
1 个回答
问题在于,你把 'Done'
当成了 Queue
中一个特殊的标记,表示迭代应该停止。所以,如果你用一个 for 循环来遍历这个 Queue
,最终得到的结果只会是 1
。但是你却说 Queue
的长度是 2。这就搞乱了 map
的代码,因为它依赖这个长度来准确表示可迭代对象中的项目数量,以便知道所有结果何时从工作进程返回。
class MapResult(ApplyResult):
def __init__(self, cache, chunksize, length, callback):
ApplyResult.__init__(self, cache, callback)
...
# _number_left is used to know when the MapResult is done
self._number_left = length//chunksize + bool(length % chunksize)
所以,你需要确保长度是准确的。你可以通过几种方式做到这一点,但我建议你根本不需要在 Queue
中加载一个标记,而是使用 get_nowait
。
import multiprocessing.queues as queues
import multiprocessing
from Queue import Empty
class I(queues.Queue):
def __init__(self, maxsize=0):
super(I, self).__init__(maxsize)
self.length = 0
... <snip>
def next(self):
try:
item = self.get_nowait()
except Empty:
raise StopIteration
return item
def thisworker(item):
print 'got this item: %s' % item
return item
q=I()
q.put(1)
the_pool = multiprocessing.Pool(1)
print the_pool.map(thisworker, q)
另外,请注意,这种方法并不安全。如果你只从一个进程中 put
数据到 Queue
,并且在把 Queue
发送到工作进程后不再 put
,那么 length
属性才会是正确的。在 Python 3 中,这种方法也不适用,因为 multiprocessing.queues.Queue
的构造函数已经改变。
与其去扩展 multiprocessing.queues.Queue
,我建议使用内置的 iter
来遍历 Queue
:
q = multiprocessing.Queue()
q.put(1)
q.put(2)
q.put(None) # None is our sentinel, you could use 'Done', if you wanted
the_pool.map(thisworker, iter(q.get, None)) # This will call q.get() until None is returned
这种方法在所有版本的 Python 中都能使用,代码量也少得多,并且是安全的。
编辑:
根据你在我回答的评论中提到的需求,我认为你使用 imap
而不是 map
会更好,这样你根本不需要知道 Queue
的长度。实际上,你无法准确确定这个长度,实际上在你迭代的时候长度可能还会增加。如果你只使用 imap
,那么类似你原来的方法就能很好地工作:
import multiprocessing
class I(object):
def __init__(self, maxsize=0):
self.q = multiprocessing.Queue(maxsize)
def __getattr__(self, attr):
if hasattr(self.q, attr):
return getattr(self.q, attr)
def __iter__(self):
return self
def next(self):
item = self.q.get()
if item == 'Done':
raise StopIteration
return item
def thisworker(item):
if item == 1:
q.put(3)
if item == 2:
q.put('Done')
print 'got this item: %s' % item
return item
q=I()
q.put(1)
q.put(2)
q.put(5)
the_pool = multiprocessing.Pool(2) # 2 workers
print list(the_pool.imap(thisworker, q))
输出:
got this item: 1
got this item: 5
got this item: 3
got this item: 2
[1, 2, 5, 3]
我去掉了担心长度的代码,使用了委托而不是继承,以更好地兼容 Python 3.x。
请注意,我最初的建议,使用 iter(q.get, <sentinel>)
,在这里也仍然有效,只要你使用 imap
而不是 map
。