多进程:如何在类中使用Pool.map函数?
当我运行类似这样的代码:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
它运行得很好。不过,把它放到一个类的函数里:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
就出现了以下错误:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我看到过Alex Martelli发的一个帖子,讨论过类似的问题,但内容不是很清楚。
20 个回答
多进程和序列化在标准库中有些问题和限制,除非你使用其他库。
如果你使用一个叫做 pathos.multiprocessing
的库,它是 multiprocessing
的一个分支,你就可以在多进程的 map
函数中直接使用类和类的方法。这是因为它使用了 dill
,而不是 pickle
或 cPickle
,而 dill
可以序列化几乎所有的 Python 对象。
pathos.multiprocessing
还提供了一个异步的 map 函数……而且它可以处理多个参数的函数,比如 map(math.pow, [1,2,3], [4,5,6])
。
可以查看讨论内容: 多进程和 dill 可以一起做什么?
还有: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
它甚至可以处理你最初写的代码,而无需修改,直接从解释器中运行。 为什么还要使用其他更脆弱和特定于单一情况的方法呢?
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
... def run(self):
... def f(x):
... return x*x
... p = Pool()
... return p.map(f, [1,2,3])
...
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]
在这里获取代码: https://github.com/uqfoundation/pathos
而且,为了展示它还能做些什么:
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> p = Pool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]
我之前无法使用已经发布的代码,因为使用“multiprocessing.Pool”的代码不能和lambda表达式一起工作,而不使用“multiprocessing.Pool”的代码则会根据工作项的数量生成很多进程。
我对代码进行了调整,使它能够生成预定义数量的工作进程,并且只有在有空闲的工作进程时才会遍历输入列表。我还为工作进程启用了“守护”模式,这样按下ctrl-c时可以按预期工作。
import multiprocessing
def fun(f, q_in, q_out):
while True:
i, x = q_in.get()
if i is None:
break
q_out.put((i, f(x)))
def parmap(f, X, nprocs=multiprocessing.cpu_count()):
q_in = multiprocessing.Queue(1)
q_out = multiprocessing.Queue()
proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
for _ in range(nprocs)]
for p in proc:
p.daemon = True
p.start()
sent = [q_in.put((i, x)) for i, x in enumerate(X)]
[q_in.put((None, None)) for _ in range(nprocs)]
res = [q_out.get() for _ in range(len(sent))]
[p.join() for p in proc]
return [x for i, x in sorted(res)]
if __name__ == '__main__':
print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
我也对pool.map能接受的函数类型限制感到烦恼。于是我写了下面的代码来绕过这个限制。看起来这个方法有效,甚至可以递归使用parmap。
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe, x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f, X):
pipe = [Pipe() for x in X]
proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
[p.start() for p in proc]
[p.join() for p in proc]
return [p.recv() for (p, c) in pipe]
if __name__ == '__main__':
print parmap(lambda x: x**x, range(1, 5))