在绑定方法上使用multiprocessing.Pool
我在代码中尝试使用 multiprocessing.Pool
,但遇到了一个异常:
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed
我的问题是,我不知道如何在我的代码中实现这个解决方案。
我的代码大致是这样的:
class G(class):
def submit(self,data):
cmd = self.createCommand(data)
subprocess.call(cmd, shell=True)
# call for a short command
def main(self):
self.pool = multiprocessing.Pool()
while(True):
data = self.GenerateData()
self.pool.apply_async(self.Submit, args=(data,))
一些说明:
- 主
while
循环应该运行很长时间(几天) - 我使用
pool
是为了提高性能,如果你有更好的解决方案,我很乐意听听。
更新:
在使用 @unutbu 的解决方案后,我遇到了下一个异常:
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
现在,我找到的所有解决方案都在讨论 Queue.Queue
和 mp.Pool.map
,但我并没有使用这些属性,所以我搞不清楚该怎么做。
1 个回答
0
这是对Steven Bethard提出的解决方案在你这种情况下的应用:
import multiprocessing as mp
import time
import copy_reg
import types
def _pickle_method(method):
"""
Author: Steven Bethard
http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
"""
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
cls_name = ''
if func_name.startswith('__') and not func_name.endswith('__'):
cls_name = cls.__name__.lstrip('_')
if cls_name:
func_name = '_' + cls_name + func_name
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
"""
Author: Steven Bethard
http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
"""
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
# This call to copy_reg.pickle allows you to pass methods as the first arg to
# mp.Pool methods. If you comment out this line, `pool.map(self.foo, ...)` results in
# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
# __builtin__.instancemethod failed
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
class G(object):
def submit(self, data):
print('processing {}'.format(data))
# cmd = self.createCommand(data)
# subprocess.call(cmd, shell=True)
# call for a short command
time.sleep(2)
def main(self):
pool = mp.Pool()
while True:
data = (1, 2, 3)
pool.apply_async(self.submit, args=(data,))
if __name__ == '__main__':
g = G()
g.main()