我可以在Pool.imap调用的函数中使用多进程队列吗?
我在用 Python 2.7,想要把一些占用 CPU 很多的任务放在独立的进程中运行。我希望能把进程的当前状态通过消息发送回父进程,这样父进程就能知道进程的情况。看起来 multiprocessing 的队列(Queue)很适合这个需求,但我就是搞不定怎么用。
这是我一个基本的工作示例,不过没有用到队列。
import multiprocessing as mp
import time
def f(x):
return x*x
def main():
pool = mp.Pool()
results = pool.imap_unordered(f, range(1, 6))
time.sleep(1)
print str(results.next())
pool.close()
pool.join()
if __name__ == '__main__':
main()
我试过用好几种方法传递队列,但都出现了错误信息:“RuntimeError: Queue objects should only be shared between processes through inheritance”(队列对象只能通过继承在进程间共享)。这是我根据之前找到的一个答案尝试的其中一种方法。(我在用 Pool.map_async 和 Pool.imap 时也遇到同样的问题)
import multiprocessing as mp
import time
def f(args):
x = args[0]
q = args[1]
q.put(str(x))
time.sleep(0.1)
return x*x
def main():
q = mp.Queue()
pool = mp.Pool()
results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))
print str(q.get())
pool.close()
pool.join()
if __name__ == '__main__':
main()
最后,那个把队列设为全局的办法并没有产生任何消息,它只是卡住了。
import multiprocessing as mp
import time
q = mp.Queue()
def f(x):
q.put(str(x))
return x*x
def main():
pool = mp.Pool()
results = pool.imap_unordered(f, range(1, 6))
time.sleep(1)
print q.get()
pool.close()
pool.join()
if __name__ == '__main__':
main()
我知道直接用 multiprocessing.Process 可能会有效,还有其他库也能做到这一点,但我不想放弃标准库的函数,因为我觉得它们很合适,除非我确认不是因为我知识不足才无法利用它们。
谢谢。
2 个回答
1
在使用 fork
启动方法(也就是在Unix系统上),你不需要使用那个初始化的技巧,像顶层答案里说的那样。
只要把 mp.Queue
定义为全局变量,子进程就会正确地继承它。
原作者的例子在Linux上使用Python 3.9.7运行得很好(代码稍微调整了一下):
import multiprocessing as mp
import time
q = mp.Queue()
def f(x):
q.put(str(x))
return x * x
def main():
pool = mp.Pool(5)
pool.imap_unordered(f, range(1, 6))
time.sleep(1)
for _ in range(1, 6):
print(q.get())
pool.close()
pool.join()
if __name__ == '__main__':
main()
输出:
2
1
3
4
5
虽然已经过去12年了,但我想确保任何遇到这个问题的Linux用户都知道,顶层答案里的技巧只有在你不能使用fork的时候才需要。
57
诀窍是把队列作为参数传递给初始化器。这似乎适用于所有的池调度方法。
import multiprocessing as mp
def f(x):
f.q.put('Doing: ' + str(x))
return x*x
def f_init(q):
f.q = q
def main():
jobs = range(1,6)
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, jobs)
p.close()
for i in range(len(jobs)):
print q.get()
print results.next()
if __name__ == '__main__':
main()