Python多进程PicklingError: 无法序列化<type 'function'>

380 投票
10 回答
475100 浏览
提问于 2025-04-17 09:56

抱歉,我无法用更简单的例子重现这个错误,而且我的代码太复杂,没法直接贴出来。如果我在IPython环境中运行程序,而不是在普通的Python中,事情就能顺利进行。

我查了一些之前关于这个问题的笔记。那些问题都是因为在类的函数里用池(pool)去调用定义在类里的函数。但我这次不是这种情况。

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我会很感激任何帮助。

更新:我用来序列化(pickle)的函数是在模块的顶层定义的。虽然它调用了一个包含嵌套函数的函数。也就是说,f() 调用 g()g() 调用 h(),而 h() 里有一个嵌套函数 i(),我正在调用 pool.apply_async(f)f()g()h() 都是在顶层定义的。我尝试了一个更简单的例子,使用这个模式,结果是可以的。

10 个回答

66

当你在使用 multiprocessing 时遇到这个问题,一个简单的解决办法是把 Pool 换成 ThreadPool。这样做只需要改一下导入的部分,其他代码不用动。

from multiprocessing.pool import ThreadPool as Pool

这样做的原因是,ThreadPool 和主线程共享内存,而不是创建一个新的进程——这意味着不需要进行序列化。

不过,这种方法也有缺点,因为 Python 在处理线程方面并不是特别优秀。它使用了一种叫做全局解释器锁(Global Interpreter Lock)的机制来保证线程安全,这可能会在某些情况下导致速度变慢。然而,如果你主要是在和其他系统交互(比如运行 HTTP 命令、和数据库对话、写入文件系统),那么你的代码通常不会受到 CPU 的限制,影响也不会太大。实际上,我发现当我在写 HTTP/HTTPS 性能测试时,这种线程模型的开销和延迟都更小,因为创建新进程的开销要比创建新线程的开销大得多,而程序在那时主要是在等待 HTTP 响应。

所以,如果你在 Python 用户空间处理大量数据,这可能不是最好的方法。

142

我建议使用 pathos.multiprocessing,而不是 multiprocessingpathos.multiprocessingmultiprocessing 的一个分支,它使用了 dilldill 可以把几乎所有的 Python 对象都转化成可以传输的格式,这样你就能在并行处理时传送更多的数据。pathos 的这个分支还可以直接处理多个参数的函数,这在使用类的方法时特别有用。

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

你可以在这里获取 pathos(如果你需要的话,也可以获取 dill): https://github.com/uqfoundation

431

这里有一份可以被序列化的内容列表。特别要注意的是,只有在模块的顶层定义的函数才能被序列化。

这段代码:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

会产生一个和你提到的错误几乎一模一样的错误:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

问题在于,pool的方法都使用了mp.SimpleQueue来将任务传递给工作进程。所有通过mp.SimpleQueue传递的内容都必须是可以被序列化的,而foo.work不能被序列化,因为它不是在模块的顶层定义的。

可以通过在顶层定义一个函数来解决这个问题,这个函数调用foo.work()

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

注意,foo是可以被序列化的,因为Foo是在顶层定义的,并且foo.__dict__是可以被序列化的。

撰写回答