可迭代的多进程队列未退出

3 投票
1 回答
1213 浏览
提问于 2025-04-27 22:50
import multiprocessing.queues as queues
import multiprocessing
class I(queues.Queue):
    def __init__(self, maxsize=0):
        super(I, self).__init__(maxsize)
        self.length = 0 

    def __iter__(self):
        return self

    def put(self, obj, block=True, timeout=None):
        super(I, self).put(obj,block,timeout)
        self.length += 1

    def get(self, block = True, timeout = None):
        self.length -= 1
        return super(I, self).get(block, timeout)

    def __len__(self):
        return self.length

    def next(self):
        item = self.get()
        if item == 'Done':
            raise StopIteration
        return item


def thisworker(item):
    print 'got this item: %s' % item
    return item

q=I()

q.put(1)
q.put('Done')

the_pool = multiprocessing.Pool(1)
print the_pool.map(thisworker, q)

我正在尝试创建一个可迭代的队列,用于与多进程池的映射功能一起使用。我的想法是,函数 thisworker 会不断往队列里添加一些项目,直到满足某个条件,然后在队列里放入“完成”这个标记后退出(不过在这段代码里我还没实现这一点)。

但是,这段代码总是无法完成,它总是卡住。

我无法找到真正的原因。希望能得到你的帮助。

附注:我使用了 self.length,因为从 the_pool.map 调用的 map_async 方法需要用到可迭代对象的长度,以便形成一个变量 chunksize,这个变量将用于从池中获取任务。

暂无标签

1 个回答

2

问题在于,你把 'Done' 当成了 Queue 中一个特殊的标记,表示迭代应该停止。所以,如果你用一个 for 循环来遍历这个 Queue,最终得到的结果只会是 1。但是你却说 Queue 的长度是 2。这就搞乱了 map 的代码,因为它依赖这个长度来准确表示可迭代对象中的项目数量,以便知道所有结果何时从工作进程返回。

class MapResult(ApplyResult):

    def __init__(self, cache, chunksize, length, callback):
        ApplyResult.__init__(self, cache, callback)
        ...
        # _number_left is used to know when the MapResult is done
        self._number_left = length//chunksize + bool(length % chunksize)

所以,你需要确保长度是准确的。你可以通过几种方式做到这一点,但我建议你根本不需要在 Queue 中加载一个标记,而是使用 get_nowait

import multiprocessing.queues as queues
import multiprocessing
from Queue import Empty

class I(queues.Queue):
    def __init__(self, maxsize=0):
        super(I, self).__init__(maxsize)
        self.length = 0 

    ... <snip>

    def next(self):
        try:
            item = self.get_nowait()
        except Empty:
            raise StopIteration
        return item


def thisworker(item):
    print 'got this item: %s' % item
    return item

q=I()

q.put(1)

the_pool = multiprocessing.Pool(1)
print the_pool.map(thisworker, q)

另外,请注意,这种方法并不安全。如果你只从一个进程中 put 数据到 Queue,并且在把 Queue 发送到工作进程后不再 put,那么 length 属性才会是正确的。在 Python 3 中,这种方法也不适用,因为 multiprocessing.queues.Queue 的构造函数已经改变。

与其去扩展 multiprocessing.queues.Queue,我建议使用内置的 iter 来遍历 Queue

q = multiprocessing.Queue()
q.put(1)
q.put(2)
q.put(None)  # None is our sentinel, you could use 'Done', if you wanted
the_pool.map(thisworker, iter(q.get, None)) # This will call q.get() until None is returned

这种方法在所有版本的 Python 中都能使用,代码量也少得多,并且是安全的。

编辑:

根据你在我回答的评论中提到的需求,我认为你使用 imap 而不是 map 会更好,这样你根本不需要知道 Queue 的长度。实际上,你无法准确确定这个长度,实际上在你迭代的时候长度可能还会增加。如果你只使用 imap,那么类似你原来的方法就能很好地工作:

import multiprocessing

class I(object):
    def __init__(self, maxsize=0):
        self.q = multiprocessing.Queue(maxsize)

    def __getattr__(self, attr):
        if hasattr(self.q, attr):
            return getattr(self.q, attr)

    def __iter__(self):
        return self

    def next(self):
        item = self.q.get()
        if item == 'Done':
            raise StopIteration
        return item


def thisworker(item):
    if item == 1:
        q.put(3)
    if item == 2:
        q.put('Done')
    print 'got this item: %s' % item
    return item

q=I()

q.put(1)
q.put(2)
q.put(5)

the_pool = multiprocessing.Pool(2)  # 2 workers
print list(the_pool.imap(thisworker, q))

输出:

got this item: 1
got this item: 5
got this item: 3
got this item: 2
[1, 2, 5, 3]

我去掉了担心长度的代码,使用了委托而不是继承,以更好地兼容 Python 3.x。

请注意,我最初的建议,使用 iter(q.get, <sentinel>),在这里也仍然有效,只要你使用 imap 而不是 map

撰写回答