在多个进程之间共享结果队列

2024-04-20 05:46:41 发布

您现在位置:Python中文网/ 问答频道 /正文

multiprocessing模块的文档显示了如何将队列传递给以multiprocessing.Process开头的进程。但是,如何与以apply_async启动的异步工作进程共享队列?我不需要动态连接或其他任何东西,只需要让工人(反复)将结果报告给base。

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    q = multiprocessing.Queue()
    workers = pool.apply_async(worker, (33, q))

失败的原因是: RuntimeError: Queue objects should only be shared between processes through inheritance。 我理解这意味着什么,我理解继承的建议,而不是要求酸洗/脱皮(以及所有特殊的Windows限制)。但是,我如何以一种有效的方式传递队列呢?我找不到一个例子,我试过几种不同的方法,但都失败了。请帮忙?


Tags: 模块name文档async队列queue进程动态
2条回答

尝试使用multiprocessing.Manager来管理队列,并使不同的工作人员可以访问它。

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue()
    workers = pool.apply_async(worker, (33, q))

multiprocessing.Pool已经有一个共享结果队列,不需要另外涉及一个Manager.QueueManager.Queue是一个位于引擎盖下的^{}(多线程队列),位于单独的服务器进程上,并通过代理公开。与池的内部队列相比,这会增加额外的开销。与依赖池的本机结果处理相反,Manager.Queue中的结果也不能保证被排序。

工作进程不是以.apply_async()开始的,这在实例化Pool时已经发生。什么是开始的 当你调用pool.apply_async()是一个新的“作业”。池的工作进程在引擎盖下运行multiprocessing.pool.worker-函数。此函数负责处理通过池的内部Pool._inqueue传输的新“任务”,并通过Pool._outqueue将结果发送回父级。指定的func将在multiprocessing.pool.worker内执行。func只需return一些内容,结果将自动发送回父级。

.apply_async()立即(异步)返回一个^{}对象(用于ApplyResult的别名)。您需要对该对象调用.get()(正在阻塞)以接收实际结果。另一个选择是注册一个callback函数,一旦结果就绪,它就会被触发。

from multiprocessing import Pool

def busy_foo(i):
    """Dummy function simulating cpu-bound work."""
    for _ in range(int(10e6)):  # do stuff
        pass
    return i

if __name__ == '__main__':

    with Pool(4) as pool:
        print(pool._outqueue)  # DEMO
        results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
        # `.apply_async()` immediately returns AsyncResult (ApplyResult) object
        print(results[0])  # DEMO
        results = [res.get() for res in results]
        print(f'result: {results}')       

示例输出:

<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

注意:为.get()指定timeout参数不会停止工作进程中任务的实际处理,它只会通过引发multiprocessing.TimeoutError来解除阻止等待的父进程。

相关问题 更多 >