多进程中返回值的队列替代方案
在Python的多进程编程中,有没有类似于Process.return_value()的东西?为什么没有,除了使用队列还有什么其他选择?
背景: 目前,我的代码中有一些函数f,这些函数处理大量的数据序列,这些序列是numpy数组(a,b)。
b= f(a)
def f(a):
# do some work here
return b
我可以像这样对每个函数进行并行处理:
plist=[]
for i in range(ncores):
p= Process(target=f, args=(a[i::ncores]))
p.start()
plist.append(p)
这样可以让这部分代码的运行速度提高ncores倍。问题是如何获取返回值:
for i, p in enumerate(plist):
p.join()
b[i::ncores]= p.return_value()
我找不到通过p来获取返回值的方法。我不明白为什么这样做不行,因为p可以和核心上的进程进行通信。
现在,我正在使用队列来获取返回值,但我觉得使用队列有点麻烦:我必须把队列和一个标识符都传递给每个函数,并把结果和ID都放入队列中:
def f(a, Queue=Queue, ID=-1):
# do some work here
if ID==-1:
# normal interface
return b
else:
# parallel interface
Queue.put([ID, b])
然后当我从队列中读取时,我还得把ID和原始数组对应起来:
for i in range(ncores):
ID, result= Queue.get()
b[ID::ncores]= result
plist[ID].join()
有没有更简单的方法来实现这个?
我试过传递一个字典或列表作为关键字来存储结果,这在多线程中有效,但在多进程中不行。
1 个回答
1
返回值是什么?
如果返回的东西很简单,比如一个数组或者一个数字,你可以把一个共享内存的容器传给Process
实例,这样它就可以把结果存储在里面。如果我们想象一下,你只是想在多个进程中对一个数组进行求和:
from multiprocessing import Process,Value
import ctypes as C
import numpy as np
def func(input,output):
# do something with input
# put the result in output
output.value = input.sum()
def launch_processes(n):
outputs = []
jobs = []
for ii in range(n):
input = np.arange(ii)
# here use a synchronised wrapper to store the result of the func run
# This can be replaced with an Array instance if an array is returned
output = Value(C.c_size_t)
job = Process(target=func,args=(input,output))
outputs.append(output)
jobs.append(job)
job.start()
for job in jobs:
job.join()
for output in outputs:
print output.value
launch_processes(10)
更整洁的做法是通过继承Process
类,把这个任务封装在一个单独的对象里:
from multiprocessing import Process,Value
import ctypes as C
import numpy as np
class Job(Process):
def __init__(self,input):
super(Job,self).__init__()
self.retval = Value(C.c_size_t)
self.input = input
def run(self):
self.retval.value = self.input.sum()
def launch_processes(n):
jobs = []
for ii in range(n):
job = Job(np.arange(ii))
job.start()
jobs.append(job)
for job in jobs:
job.join()
print job.retval.value
launch_processes(10)