使用python multiprocess.pool.map_async()时无法pickle <type 'thread.lock'>
我在尝试在一个类的方法上使用 map_async
时遇到了这个错误:PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
。我的代码是:
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
cls_name = ''
if func_name.startswith('__') and not func_name.endswith('__'):
cls_name = cls.__name__.lstrip('_')
if cls_name:
func_name = '_' + cls_name + func_name
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
class MyClass(object):
def Submit(self,cmd):
subprocess.call(cmd, shell=True)
def RunTest(self):
cmds = []
for i in range(50):
cmd = CreateCmd(self)
cmds.append(cmd)
self.pool.map_async(self.Submit, cmds)
def Main(self):
self.pool = mp.pool
while True:
RunTest(self)
if __name__ == "__main__":
MyClass()
当 Submit
在类外部时可以正常工作,但像这样我就会遇到错误。此外,MyClass
还有一些我没有写的其他方法和属性,其中一个是日志记录器,这可能是问题所在吗?
1 个回答
4
我用了一些不同的导入方式来构建你的代码,特别是用 dill
替代了 pickle
。我还用了一个叫 pathos.multiprocessing
的库,它是 multiprocessing
的一个分支,支持 dill
。这样我就可以对你的类方法和绑定方法进行序列化了。我没有理会你教 copy_reg
如何序列化模块的部分,因为 dill
已经能做到这一点了。
我对你的代码做了一些修改,因为它原本是不能工作的。我还得自己写一个 CreateCmd 函数,因为你没有提供。另外,这段代码现在可以启动多进程任务……但你永远得不到结果,因为你没有请求结果。你到底想要做什么呢?
总之,这里有一些和你的代码类似但能正常工作的代码。虽然它仍然不会给你任何有价值的结果,只是证明它可以序列化并且代码可以运行。请发布可以运行的代码,并且能抛出你所报告的错误。
>>> import dill as pickle
>>> import subprocess
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> def CreateCmd(cmd):
... return 'sleep {0}'.format(cmd)
>>>
>>> class MyClass(object):
... def Submit(self, cmd):
... subprocess.call(cmd, shell=True)
... def RunTest(self):
... cmds = []
... for i in range(50):
... cmd = CreateCmd(i)
... cmds.append(cmd)
... self.pool.amap(self.Submit, cmds) # equivalent to map_async
... def Main(self):
... self.pool = Pool()
... self.RunTest()
...
>>> pickle.loads(pickle.dumps(MyClass))
<class '__main__.MyClass'>
>>> pickle.loads(pickle.dumps(MyClass.RunTest))
<unbound method MyClass.RunTest>
>>> x = MyClass()
>>> pickle.loads(pickle.dumps(x.RunTest))
<bound method MyClass.RunTest of <__main__.MyClass object at 0x10d015b10>>
>>> x.Main()
>>> x.Submit('sleep 1')
>>> # use get to get the result… so 'sleep' is felt by the script
>>> res = x.pool.amap(x.Submit, (CreateCmd(i) for i in range(10)))
>>> res.get()
[None, None, None, None, None, None, None, None, None, None]
顺便说一下,如果你需要 dill
或 pathos
,可以在这里找到:https://github.com/uqfoundation
对了,如果你想序列化一个线程锁,也可以做到。
>>> import dill as pickle
>>> import threading
>>> lock = threading.Lock()
>>>
>>> pickle.loads(pickle.dumps(lock))
<thread.lock object at 0x10c534650>