如何将队列引用传递给pool.map_async()管理的函数?

43 投票
2 回答
29861 浏览
提问于 2025-04-16 01:04

我想要一个长时间运行的程序,它能通过一个队列(或者类似的东西)来返回进度,这样我就可以把这些进度信息用在一个进度条对话框里。同时,我还需要在这个过程完成后得到结果。这里有个测试示例,但它失败了,报了一个错误:RuntimeError: Queue objects should only be shared between processes through inheritance

import multiprocessing, time

def task(args):
    count = args[0]
    queue = args[1]
    for i in xrange(count):
        queue.put("%d mississippi" % i)
    return "Done"

def main():
    q = multiprocessing.Queue()
    pool = multiprocessing.Pool()
    result = pool.map_async(task, [(x, q) for x in range(10)])
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

我已经能够通过单独的进程对象来实现这个功能(在这种情况下,我可以传递一个队列的引用),但这样我就没有办法管理我想要启动的多个进程。有没有什么更好的方法可以做到这一点?

2 个回答

8

q 设为 全局 是有效的...:

import multiprocessing, time

q = multiprocessing.Queue()

def task(count):
    for i in xrange(count):
        q.put("%d mississippi" % i)
    return "Done"

def main():
    pool = multiprocessing.Pool()
    result = pool.map_async(task, range(10))
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

如果你需要多个队列,比如为了避免不同的进程之间进度混淆,使用一个全局的队列列表是可行的(当然,每个进程需要知道在这个列表中用哪个 索引,不过这可以作为参数传递,没问题;-)。

58

下面的代码看起来是可以正常工作的:

import multiprocessing, time

def task(args):
    count = args[0]
    queue = args[1]
    for i in xrange(count):
        queue.put("%d mississippi" % i)
    return "Done"


def main():
    manager = multiprocessing.Manager()
    q = manager.Queue()
    pool = multiprocessing.Pool()
    result = pool.map_async(task, [(x, q) for x in range(10)])
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

请注意,这里的队列是通过 manager.Queue() 获取的,而不是 multiprocessing.Queue()。感谢 Alex 指出这一点。

撰写回答