正确实现管道中的多进程
我想了解一下如何正确地使用多进程。假设我有一个列表 [1,2,3,4,5]
,这个列表是由函数 f1
生成的,并且它被放入一个 Queue
(左边的绿色圆圈)。现在我启动两个进程,从这个队列中取数据(通过在进程中执行 f2
)。它们处理这些数据,比如把值翻倍,然后把结果写入第二个队列。接着,函数 f3
读取这些数据并打印出来。
在这些函数内部,有一个循环,试图一直从队列中读取数据。我该如何停止这个进程呢?
想法 1
f1
不仅发送列表,还发送一个 None
对象或者一个自定义对象,比如 class PipelineTerminator: pass
,这种对象会一直传递下去。f3
现在等待 None
的到来,当它到达时,就会跳出循环。问题是,可能有一个 f2
在读取并传递 None
,而另一个 f2
还在处理一个数字。这样最后一个值就会丢失。
想法 2
f3
是 f1
。也就是说,函数 f1
生成数据和管道,启动进程执行 f2
并提供所有数据。在启动和提供数据后,它监听第二个管道,简单地计数和处理接收到的对象。因为它知道提供了多少数据,所以可以终止执行 f2
的进程。但如果目标是建立一个处理管道,不同的步骤应该是可分开的。因此 f1
、f2
和 f3
是管道的不同部分,而耗时的步骤是并行进行的。
想法 3
管道的每个部分都是一个函数,这个函数可以根据需要启动进程,并负责管理它们。它知道有多少数据进入,多少数据被返回(可能用 yield
)。所以安全地传递一个 None
对象是没问题的。
setup child processes
execute thread one and two and wait until both finished
thread 1:
while True:
pull from input queue
if None: break and set finished_flag
else: push to queue1 and increment counter1
thread 2:
while True:
pull from queue2
increment counter2
yield result
if counter1 == counter2 and finished_flag: break
when both threads finished: kill process pool and return.
(与其使用线程,不如考虑一个更聪明的解决方案。)
所以……
我实现了一个遵循想法 2 的解决方案,提供数据并等待结果到达,但这并不是真正的独立函数连接在一起的管道。它适用于我需要处理的任务,但维护起来很困难。
我想听听你们是如何实现管道的(在一个进程中使用生成器函数等很简单,但在多个进程中呢?)以及通常是如何管理它们的。
6 个回答
对于想法1,怎么样:
import multiprocessing as mp
sentinel=None
def f2(inq,outq):
while True:
val=inq.get()
if val is sentinel:
break
outq.put(val*2)
def f3(outq):
while True:
val=outq.get()
if val is sentinel:
break
print(val)
def f1():
num_workers=2
inq=mp.Queue()
outq=mp.Queue()
for i in range(5):
inq.put(i)
for i in range(num_workers):
inq.put(sentinel)
workers=[mp.Process(target=f2,args=(inq,outq)) for i in range(2)]
printer=mp.Process(target=f3,args=(outq,))
for w in workers:
w.start()
printer.start()
for w in workers:
w.join()
outq.put(sentinel)
printer.join()
if __name__=='__main__':
f1()
与想法1的描述唯一不同的是,f2
在收到信号后会跳出while-loop
(这样就结束了自己)。而f1
会一直等到所有的工作完成(通过w.join()
),然后再给f3
发送信号(表示它可以跳出自己的while-loop
)。
使用MPipe模块,你只需要这样做:
from mpipe import OrderedStage, Pipeline
def f1(value):
return value * 2
def f2(value):
print(value)
s1 = OrderedStage(f1, size=2)
s2 = OrderedStage(f2)
p = Pipeline(s1.link(s2))
for task in 1, 2, 3, 4, 5, None:
p.put(task)
上面的代码会运行4个进程:
- 两个用于第一阶段(函数f1)
- 一个用于第二阶段(函数f2)
- 还有一个是主程序,用来给这个流程提供数据。
MPipe使用手册里有一些关于如何内部关闭进程的解释,使用None
作为最后一个任务。
要运行这段代码,首先安装MPipe:
virtualenv venv
venv/bin/pip install mpipe
venv/bin/python prog.py
输出结果:
2
4
6
8
10
如果用第一种想法,每个工作进程(f2)在完成后放一个带有自己标识的自定义对象,这样有什么问题呢?然后f3只需要结束那个工作进程,直到没有工作进程剩下为止。
另外,Python 3.2 新增了一个叫做 concurrent.futures 的库,它应该能以“正确的方式”做到你想要的事情(商标) - http://docs.python.org/dev/library/concurrent.futures.html
也许可以找到一个 concurrent.futures 的版本,适用于 Python 2.x 系列。