Python多进程PicklingError: 无法序列化<type 'function'>
抱歉,我无法用更简单的例子重现这个错误,而且我的代码太复杂,没法直接贴出来。如果我在IPython环境中运行程序,而不是在普通的Python中,事情就能顺利进行。
我查了一些之前关于这个问题的笔记。那些问题都是因为在类的函数里用池(pool)去调用定义在类里的函数。但我这次不是这种情况。
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我会很感激任何帮助。
更新:我用来序列化(pickle)的函数是在模块的顶层定义的。虽然它调用了一个包含嵌套函数的函数。也就是说,f()
调用 g()
,g()
调用 h()
,而 h()
里有一个嵌套函数 i()
,我正在调用 pool.apply_async(f)
。f()
、g()
、h()
都是在顶层定义的。我尝试了一个更简单的例子,使用这个模式,结果是可以的。
10 个回答
当你在使用 multiprocessing
时遇到这个问题,一个简单的解决办法是把 Pool
换成 ThreadPool
。这样做只需要改一下导入的部分,其他代码不用动。
from multiprocessing.pool import ThreadPool as Pool
这样做的原因是,ThreadPool 和主线程共享内存,而不是创建一个新的进程——这意味着不需要进行序列化。
不过,这种方法也有缺点,因为 Python 在处理线程方面并不是特别优秀。它使用了一种叫做全局解释器锁(Global Interpreter Lock)的机制来保证线程安全,这可能会在某些情况下导致速度变慢。然而,如果你主要是在和其他系统交互(比如运行 HTTP 命令、和数据库对话、写入文件系统),那么你的代码通常不会受到 CPU 的限制,影响也不会太大。实际上,我发现当我在写 HTTP/HTTPS 性能测试时,这种线程模型的开销和延迟都更小,因为创建新进程的开销要比创建新线程的开销大得多,而程序在那时主要是在等待 HTTP 响应。
所以,如果你在 Python 用户空间处理大量数据,这可能不是最好的方法。
我建议使用 pathos.multiprocessing
,而不是 multiprocessing
。pathos.multiprocessing
是 multiprocessing
的一个分支,它使用了 dill
。dill
可以把几乎所有的 Python 对象都转化成可以传输的格式,这样你就能在并行处理时传送更多的数据。pathos
的这个分支还可以直接处理多个参数的函数,这在使用类的方法时特别有用。
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>>
>>> class Foo(object):
... @staticmethod
... def work(self, x):
... return x+1
...
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101
你可以在这里获取 pathos
(如果你需要的话,也可以获取 dill
):
https://github.com/uqfoundation
这里有一份可以被序列化的内容列表。特别要注意的是,只有在模块的顶层定义的函数才能被序列化。
这段代码:
import multiprocessing as mp
class Foo():
@staticmethod
def work(self):
pass
if __name__ == '__main__':
pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()
会产生一个和你提到的错误几乎一模一样的错误:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
问题在于,pool
的方法都使用了mp.SimpleQueue
来将任务传递给工作进程。所有通过mp.SimpleQueue
传递的内容都必须是可以被序列化的,而foo.work
不能被序列化,因为它不是在模块的顶层定义的。
可以通过在顶层定义一个函数来解决这个问题,这个函数调用foo.work()
:
def work(foo):
foo.work()
pool.apply_async(work,args=(foo,))
注意,foo
是可以被序列化的,因为Foo
是在顶层定义的,并且foo.__dict__
是可以被序列化的。