使用多进程的Python装饰器失败
我想在一个函数上使用装饰器,然后把这个函数传给多进程池。但是,代码出错了,提示“PicklingError: Can't pickle : attribute lookup __builtin__
.function failed”。我不太明白为什么会出错。我觉得这应该是个简单的问题,但我找不到原因。下面是一个最小的“可工作”示例。我以为使用functools
这个函数就足够让它正常工作了。
如果我把函数的装饰器注释掉,它就能正常运行。那我在multiprocessing
方面到底理解错了什么呢?有没有办法让它正常工作?
编辑:在添加了一个可调用类装饰器和一个函数装饰器后,结果是函数装饰器按预期工作。可调用类装饰器依然出错。那这个可调用类版本有什么特别之处,让它无法被序列化呢?
import random
import multiprocessing
import functools
class my_decorator_class(object):
def __init__(self, target):
self.target = target
try:
functools.update_wrapper(self, target)
except:
pass
def __call__(self, elements):
f = []
for element in elements:
f.append(self.target([element])[0])
return f
def my_decorator_function(target):
@functools.wraps(target)
def inner(elements):
f = []
for element in elements:
f.append(target([element])[0])
return f
return inner
@my_decorator_function
def my_func(elements):
f = []
for element in elements:
f.append(sum(element))
return f
if __name__ == '__main__':
elements = [[random.randint(0, 9) for _ in range(5)] for _ in range(10)]
pool = multiprocessing.Pool(processes=4)
results = [pool.apply_async(my_func, ([e],)) for e in elements]
pool.close()
f = [r.get()[0] for r in results]
print(f)
4 个回答
如果你也像我一样非常想要使用装饰器,可以通过在函数字符串上使用 exec()
命令来绕过之前提到的序列化问题。
我想要能够将所有参数传递给原始函数,然后依次使用它们。以下是我为此写的代码。
首先,我创建了一个 make_functext()
函数,用来把目标函数对象转换成字符串。为此,我使用了 inspect
模块中的 getsource()
函数(详细信息可以查看 这里,请注意它无法从编译后的代码等中获取源代码)。代码如下:
from inspect import getsource
def make_functext(func):
ft = '\n'.join(getsource(func).split('\n')[1:]) # Removing the decorator, of course
ft = ft.replace(func.__name__, 'func') # Making function callable with 'func'
ft = ft.replace('#§ ', '').replace('#§', '') # For using commented code starting with '#§'
ft = ft.strip() # In case the function code was indented
return ft
这个函数在下面的 _worker()
函数中使用,这个函数将作为进程的目标:
def _worker(functext, args):
scope = {} # This is needed to keep executed definitions
exec(functext, scope)
scope['func'](args) # Using func from scope
最后,这是我的装饰器:
from multiprocessing import Process
def parallel(num_processes, **kwargs):
def parallel_decorator(func, num_processes=num_processes):
functext = make_functext(func)
print('This is the parallelized function:\n', functext)
def function_wrapper(funcargs, num_processes=num_processes):
workers = []
print('Launching processes...')
for k in range(num_processes):
p = Process(target=_worker, args=(functext, funcargs[k])) # use args here
p.start()
workers.append(p)
return function_wrapper
return parallel_decorator
代码最终可以通过定义一个这样的函数来使用:
@parallel(4)
def hello(args):
#§ from time import sleep # use '#§' to avoid unnecessary (re)imports in main program
name, seconds = tuple(args) # unpack args-list here
sleep(seconds)
print('Hi', name)
... 现在可以这样调用:
hello([['Marty', 0.5],
['Catherine', 0.9],
['Tyler', 0.7],
['Pavel', 0.3]])
... 输出结果是:
This is the parallelized function:
def func(args):
from time import sleep
name, seconds = tuple(args)
sleep(seconds)
print('Hi', name)
Launching processes...
Hi Pavel
Hi Marty
Hi Tyler
Hi Catherine
感谢阅读,这是我第一次发帖。如果你发现任何错误或不好的做法,请随时留言。我知道这些字符串转换的方式有点不太干净……
我在使用装饰器和多进程时也遇到了一些问题。不确定这是不是和你遇到的问题一样:
我的代码是这样的:
from multiprocessing import Pool
def decorate_func(f):
def _decorate_func(*args, **kwargs):
print "I'm decorating"
return f(*args, **kwargs)
return _decorate_func
@decorate_func
def actual_func(x):
return x ** 2
my_swimming_pool = Pool()
result = my_swimming_pool.apply_async(actual_func,(2,))
print result.get()
当我运行这段代码时,我得到了这个:
Traceback (most recent call last):
File "test.py", line 15, in <module>
print result.get()
File "somedirectory_too_lengthy_to_put_here/lib/python2.7/multiprocessing/pool.py", line 572, in get
raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我通过定义一个新的函数来包裹装饰器中的函数来解决这个问题,而不是使用装饰器的语法
from multiprocessing import Pool
def decorate_func(f):
def _decorate_func(*args, **kwargs):
print "I'm decorating"
return f(*args, **kwargs)
return _decorate_func
def actual_func(x):
return x ** 2
def wrapped_func(*args, **kwargs):
return decorate_func(actual_func)(*args, **kwargs)
my_swimming_pool = Pool()
result = my_swimming_pool.apply_async(wrapped_func,(2,))
print result.get()
代码运行得非常顺利,我得到了:
I'm decorating
4
我对Python不是很熟悉,但这个解决方案帮我解决了问题
这个问题是,pickle需要一种方法来重新组合你所序列化的所有内容。你可以在这里查看可以被序列化的内容列表:
http://docs.python.org/library/pickle.html#what-can-be-pickled-and-unpickled
当你序列化my_func
时,以下几个部分需要被序列化:
一个叫
my_func
的my_decorator_class
的实例。这没问题。pickle会存储这个类的名字,并序列化它的
__dict__
内容。当你反序列化时,它会用名字找到这个类,然后创建一个实例并填充__dict__
的内容。不过,__dict__
的内容会带来一个问题……存储在
my_func.target
里的原始my_func
的实例。这就不太好了。它是一个顶层的函数,通常这些是可以被序列化的。pickle会存储这个函数的名字。然而,问题在于,名字“my_func”不再和未装饰的函数绑定,而是和装饰过的函数绑定。这意味着pickle无法查找未装饰的函数来重新创建对象。可惜的是,pickle没有办法知道它试图序列化的对象总是可以通过名字
__main__.my_func
找到。
你可以这样修改,它就能正常工作:
import random
import multiprocessing
import functools
class my_decorator(object):
def __init__(self, target):
self.target = target
try:
functools.update_wrapper(self, target)
except:
pass
def __call__(self, candidates, args):
f = []
for candidate in candidates:
f.append(self.target([candidate], args)[0])
return f
def old_my_func(candidates, args):
f = []
for c in candidates:
f.append(sum(c))
return f
my_func = my_decorator(old_my_func)
if __name__ == '__main__':
candidates = [[random.randint(0, 9) for _ in range(5)] for _ in range(10)]
pool = multiprocessing.Pool(processes=4)
results = [pool.apply_async(my_func, ([c], {})) for c in candidates]
pool.close()
f = [r.get()[0] for r in results]
print(f)
你观察到装饰器函数在类不工作时仍然有效。我认为这是因为functools.wraps
修改了被装饰的函数,使它拥有被装饰函数的名字和其他属性。就pickle模块而言,它无法区分这是否是一个普通的顶层函数,因此它通过存储名字来序列化。反序列化时,名字绑定到装饰过的函数上,所以一切都能正常运作。