多进程:如何在类中使用Pool.map函数?

212 投票
20 回答
178641 浏览
提问于 2025-04-16 01:33

当我运行类似这样的代码:

from multiprocessing import Pool

p = Pool(5)
def f(x):
     return x*x

p.map(f, [1,2,3])

它运行得很好。不过,把它放到一个类的函数里:

class calculate(object):
    def run(self):
        def f(x):
            return x*x

        p = Pool()
        return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

就出现了以下错误:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/sw/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我看到过Alex Martelli发的一个帖子,讨论过类似的问题,但内容不是很清楚。

20 个回答

78

多进程和序列化在标准库中有些问题和限制,除非你使用其他库。

如果你使用一个叫做 pathos.multiprocessing 的库,它是 multiprocessing 的一个分支,你就可以在多进程的 map 函数中直接使用类和类的方法。这是因为它使用了 dill,而不是 picklecPickle,而 dill 可以序列化几乎所有的 Python 对象。

pathos.multiprocessing 还提供了一个异步的 map 函数……而且它可以处理多个参数的函数,比如 map(math.pow, [1,2,3], [4,5,6])

可以查看讨论内容: 多进程和 dill 可以一起做什么?

还有: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

它甚至可以处理你最初写的代码,而无需修改,直接从解释器中运行。 为什么还要使用其他更脆弱和特定于单一情况的方法呢?

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
...  def run(self):
...   def f(x):
...    return x*x
...   p = Pool()
...   return p.map(f, [1,2,3])
... 
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]

在这里获取代码: https://github.com/uqfoundation/pathos

而且,为了展示它还能做些什么:

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]
95

我之前无法使用已经发布的代码,因为使用“multiprocessing.Pool”的代码不能和lambda表达式一起工作,而不使用“multiprocessing.Pool”的代码则会根据工作项的数量生成很多进程。

我对代码进行了调整,使它能够生成预定义数量的工作进程,并且只有在有空闲的工作进程时才会遍历输入列表。我还为工作进程启用了“守护”模式,这样按下ctrl-c时可以按预期工作。

import multiprocessing


def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))


def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()

    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]

    [p.join() for p in proc]

    return [x for i, x in sorted(res)]


if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
74

我也对pool.map能接受的函数类型限制感到烦恼。于是我写了下面的代码来绕过这个限制。看起来这个方法有效,甚至可以递归使用parmap。

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe, x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f, X):
    pipe = [Pipe() for x in X]
    proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p, c) in pipe]

if __name__ == '__main__':
    print parmap(lambda x: x**x, range(1, 5))

撰写回答