正确实现管道中的多进程

17 投票
6 回答
14332 浏览
提问于 2025-04-17 07:06

我想了解一下如何正确地使用多进程。假设我有一个列表 [1,2,3,4,5],这个列表是由函数 f1 生成的,并且它被放入一个 Queue(左边的绿色圆圈)。现在我启动两个进程,从这个队列中取数据(通过在进程中执行 f2)。它们处理这些数据,比如把值翻倍,然后把结果写入第二个队列。接着,函数 f3 读取这些数据并打印出来。

数据流布局

在这些函数内部,有一个循环,试图一直从队列中读取数据。我该如何停止这个进程呢?

想法 1

f1 不仅发送列表,还发送一个 None 对象或者一个自定义对象,比如 class PipelineTerminator: pass,这种对象会一直传递下去。f3 现在等待 None 的到来,当它到达时,就会跳出循环。问题是,可能有一个 f2 在读取并传递 None,而另一个 f2 还在处理一个数字。这样最后一个值就会丢失。

想法 2

f3f1。也就是说,函数 f1 生成数据和管道,启动进程执行 f2 并提供所有数据。在启动和提供数据后,它监听第二个管道,简单地计数和处理接收到的对象。因为它知道提供了多少数据,所以可以终止执行 f2 的进程。但如果目标是建立一个处理管道,不同的步骤应该是可分开的。因此 f1f2f3 是管道的不同部分,而耗时的步骤是并行进行的。

想法 3

管道想法 3

管道的每个部分都是一个函数,这个函数可以根据需要启动进程,并负责管理它们。它知道有多少数据进入,多少数据被返回(可能用 yield)。所以安全地传递一个 None 对象是没问题的。

setup child processes 

execute thread one and two and wait until both finished

thread 1:
    while True:
        pull from input queue
        if None: break and set finished_flag
        else: push to queue1 and increment counter1

thread 2:
    while True:
        pull from queue2
        increment counter2
        yield result
        if counter1 == counter2 and finished_flag: break

when both threads finished: kill process pool and return.

(与其使用线程,不如考虑一个更聪明的解决方案。)

所以……

我实现了一个遵循想法 2 的解决方案,提供数据并等待结果到达,但这并不是真正的独立函数连接在一起的管道。它适用于我需要处理的任务,但维护起来很困难。

我想听听你们是如何实现管道的(在一个进程中使用生成器函数等很简单,但在多个进程中呢?)以及通常是如何管理它们的。

6 个回答

2

对于想法1,怎么样:

import multiprocessing as mp

sentinel=None

def f2(inq,outq):
    while True:
        val=inq.get()
        if val is sentinel:
            break
        outq.put(val*2)

def f3(outq):
    while True:
        val=outq.get()
        if val is sentinel:
            break
        print(val)

def f1():
    num_workers=2
    inq=mp.Queue()
    outq=mp.Queue()
    for i in range(5):
        inq.put(i)
    for i in range(num_workers):        
        inq.put(sentinel)
    workers=[mp.Process(target=f2,args=(inq,outq)) for i in range(2)]
    printer=mp.Process(target=f3,args=(outq,))
    for w in workers:
        w.start()
    printer.start()
    for w in workers:
        w.join()
    outq.put(sentinel)
    printer.join()

if __name__=='__main__':
    f1()

与想法1的描述唯一不同的是,f2在收到信号后会跳出while-loop(这样就结束了自己)。而f1会一直等到所有的工作完成(通过w.join()),然后再给f3发送信号(表示它可以跳出自己的while-loop)。

12

使用MPipe模块,你只需要这样做:

from mpipe import OrderedStage, Pipeline

def f1(value):
    return value * 2

def f2(value):
    print(value)

s1 = OrderedStage(f1, size=2)
s2 = OrderedStage(f2)
p = Pipeline(s1.link(s2))

for task in 1, 2, 3, 4, 5, None:
    p.put(task)

上面的代码会运行4个进程

  • 两个用于第一阶段(函数f1
  • 一个用于第二阶段(函数f2
  • 还有一个是主程序,用来给这个流程提供数据。

MPipe使用手册里有一些关于如何内部关闭进程的解释,使用None作为最后一个任务。

要运行这段代码,首先安装MPipe:

virtualenv venv
venv/bin/pip install mpipe
venv/bin/python prog.py

输出结果:

2
4
6
8
10
1

如果用第一种想法,每个工作进程(f2)在完成后放一个带有自己标识的自定义对象,这样有什么问题呢?然后f3只需要结束那个工作进程,直到没有工作进程剩下为止。

另外,Python 3.2 新增了一个叫做 concurrent.futures 的库,它应该能以“正确的方式”做到你想要的事情(商标) - http://docs.python.org/dev/library/concurrent.futures.html

也许可以找到一个 concurrent.futures 的版本,适用于 Python 2.x 系列。

撰写回答