我可以在Pool.imap调用的函数中使用多进程队列吗?

35 投票
2 回答
20406 浏览
提问于 2025-04-16 04:47

我在用 Python 2.7,想要把一些占用 CPU 很多的任务放在独立的进程中运行。我希望能把进程的当前状态通过消息发送回父进程,这样父进程就能知道进程的情况。看起来 multiprocessing 的队列(Queue)很适合这个需求,但我就是搞不定怎么用。

这是我一个基本的工作示例,不过没有用到队列。

import multiprocessing as mp
import time

def f(x):
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print str(results.next())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

我试过用好几种方法传递队列,但都出现了错误信息:“RuntimeError: Queue objects should only be shared between processes through inheritance”(队列对象只能通过继承在进程间共享)。这是我根据之前找到的一个答案尝试的其中一种方法。(我在用 Pool.map_async 和 Pool.imap 时也遇到同样的问题)

import multiprocessing as mp
import time

def f(args):
    x = args[0]
    q = args[1]
    q.put(str(x))
    time.sleep(0.1)
    return x*x

def main():
    q = mp.Queue()
    pool = mp.Pool()
    results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))

    print str(q.get())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

最后,那个把队列设为全局的办法并没有产生任何消息,它只是卡住了。

import multiprocessing as mp
import time

q = mp.Queue()

def f(x):
    q.put(str(x))
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print q.get()

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

我知道直接用 multiprocessing.Process 可能会有效,还有其他库也能做到这一点,但我不想放弃标准库的函数,因为我觉得它们很合适,除非我确认不是因为我知识不足才无法利用它们。

谢谢。

2 个回答

1

在使用 fork 启动方法(也就是在Unix系统上),你不需要使用那个初始化的技巧,像顶层答案里说的那样。

只要把 mp.Queue 定义为全局变量,子进程就会正确地继承它。

原作者的例子在Linux上使用Python 3.9.7运行得很好(代码稍微调整了一下):

import multiprocessing as mp
import time

q = mp.Queue()


def f(x):
    q.put(str(x))
    return x * x


def main():
    pool = mp.Pool(5)
    pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    for _ in range(1, 6):
        print(q.get())

    pool.close()
    pool.join()


if __name__ == '__main__':
    main()

输出:

2
1
3
4
5

虽然已经过去12年了,但我想确保任何遇到这个问题的Linux用户都知道,顶层答案里的技巧只有在你不能使用fork的时候才需要。

57

诀窍是把队列作为参数传递给初始化器。这似乎适用于所有的池调度方法。

import multiprocessing as mp

def f(x):
    f.q.put('Doing: ' + str(x))
    return x*x

def f_init(q):
    f.q = q

def main():
    jobs = range(1,6)

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, jobs)
    p.close()

    for i in range(len(jobs)):
        print q.get()
        print results.next()

if __name__ == '__main__':
    main()

撰写回答