使用多进程的Python装饰器失败

20 投票
4 回答
13147 浏览
提问于 2025-04-17 13:11

我想在一个函数上使用装饰器,然后把这个函数传给多进程池。但是,代码出错了,提示“PicklingError: Can't pickle : attribute lookup __builtin__.function failed”。我不太明白为什么会出错。我觉得这应该是个简单的问题,但我找不到原因。下面是一个最小的“可工作”示例。我以为使用functools这个函数就足够让它正常工作了。

如果我把函数的装饰器注释掉,它就能正常运行。那我在multiprocessing方面到底理解错了什么呢?有没有办法让它正常工作?

编辑:在添加了一个可调用类装饰器一个函数装饰器后,结果是函数装饰器按预期工作。可调用类装饰器依然出错。那这个可调用类版本有什么特别之处,让它无法被序列化呢?

import random
import multiprocessing
import functools

class my_decorator_class(object):
    def __init__(self, target):
        self.target = target
        try:
            functools.update_wrapper(self, target)
        except:
            pass

    def __call__(self, elements):
        f = []
        for element in elements:
            f.append(self.target([element])[0])
        return f

def my_decorator_function(target):
    @functools.wraps(target)
    def inner(elements):
        f = []
        for element in elements:
            f.append(target([element])[0])
        return f
    return inner

@my_decorator_function
def my_func(elements):
    f = []
    for element in elements:
        f.append(sum(element))
    return f

if __name__ == '__main__':
    elements = [[random.randint(0, 9) for _ in range(5)] for _ in range(10)]
    pool = multiprocessing.Pool(processes=4)
    results = [pool.apply_async(my_func, ([e],)) for e in elements]
    pool.close()
    f = [r.get()[0] for r in results]
    print(f)

4 个回答

1

如果你也像我一样非常想要使用装饰器,可以通过在函数字符串上使用 exec() 命令来绕过之前提到的序列化问题。

我想要能够将所有参数传递给原始函数,然后依次使用它们。以下是我为此写的代码。

首先,我创建了一个 make_functext() 函数,用来把目标函数对象转换成字符串。为此,我使用了 inspect 模块中的 getsource() 函数(详细信息可以查看 这里,请注意它无法从编译后的代码等中获取源代码)。代码如下:

from inspect import getsource

def make_functext(func):
    ft = '\n'.join(getsource(func).split('\n')[1:]) # Removing the decorator, of course
    ft = ft.replace(func.__name__, 'func')          # Making function callable with 'func'
    ft = ft.replace('#§ ', '').replace('#§', '')    # For using commented code starting with '#§'
    ft = ft.strip()                                 # In case the function code was indented
    return ft

这个函数在下面的 _worker() 函数中使用,这个函数将作为进程的目标:

def _worker(functext, args):
    scope = {}               # This is needed to keep executed definitions
    exec(functext, scope)
    scope['func'](args)      # Using func from scope

最后,这是我的装饰器:

from multiprocessing import Process 

def parallel(num_processes, **kwargs):
    def parallel_decorator(func, num_processes=num_processes):
        functext = make_functext(func)
        print('This is the parallelized function:\n', functext)
        def function_wrapper(funcargs, num_processes=num_processes):
            workers = []
            print('Launching processes...')
            for k in range(num_processes):
                p = Process(target=_worker, args=(functext, funcargs[k])) # use args here
                p.start()
                workers.append(p)
        return function_wrapper
    return parallel_decorator

代码最终可以通过定义一个这样的函数来使用:

@parallel(4)
def hello(args):
    #§ from time import sleep     # use '#§' to avoid unnecessary (re)imports in main program
    name, seconds = tuple(args)   # unpack args-list here
    sleep(seconds)
    print('Hi', name)

... 现在可以这样调用:

hello([['Marty', 0.5],
       ['Catherine', 0.9],
       ['Tyler', 0.7],
       ['Pavel', 0.3]])

... 输出结果是:

This is the parallelized function:
 def func(args):
        from time import sleep
        name, seconds = tuple(args)
        sleep(seconds)
        print('Hi', name)
Launching processes...
Hi Pavel
Hi Marty
Hi Tyler
Hi Catherine

感谢阅读,这是我第一次发帖。如果你发现任何错误或不好的做法,请随时留言。我知道这些字符串转换的方式有点不太干净……

4

我在使用装饰器和多进程时也遇到了一些问题。不确定这是不是和你遇到的问题一样:

我的代码是这样的:

from multiprocessing import Pool

def decorate_func(f):
    def _decorate_func(*args, **kwargs):
        print "I'm decorating"
        return f(*args, **kwargs)
    return _decorate_func

@decorate_func
def actual_func(x):
    return x ** 2

my_swimming_pool = Pool()
result = my_swimming_pool.apply_async(actual_func,(2,))
print result.get()

当我运行这段代码时,我得到了这个:

Traceback (most recent call last):
  File "test.py", line 15, in <module>
    print result.get()
  File "somedirectory_too_lengthy_to_put_here/lib/python2.7/multiprocessing/pool.py", line 572, in get
    raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我通过定义一个新的函数来包裹装饰器中的函数来解决这个问题,而不是使用装饰器的语法

from multiprocessing import Pool

def decorate_func(f):
    def _decorate_func(*args, **kwargs):
        print "I'm decorating"
        return f(*args, **kwargs)
    return _decorate_func

def actual_func(x):
    return x ** 2

def wrapped_func(*args, **kwargs):
    return decorate_func(actual_func)(*args, **kwargs)

my_swimming_pool = Pool()
result = my_swimming_pool.apply_async(wrapped_func,(2,))
print result.get()

代码运行得非常顺利,我得到了:

I'm decorating
4

我对Python不是很熟悉,但这个解决方案帮我解决了问题

18

这个问题是,pickle需要一种方法来重新组合你所序列化的所有内容。你可以在这里查看可以被序列化的内容列表:

http://docs.python.org/library/pickle.html#what-can-be-pickled-and-unpickled

当你序列化my_func时,以下几个部分需要被序列化:

  • 一个叫my_funcmy_decorator_class的实例。

    这没问题。pickle会存储这个类的名字,并序列化它的__dict__内容。当你反序列化时,它会用名字找到这个类,然后创建一个实例并填充__dict__的内容。不过,__dict__的内容会带来一个问题……

  • 存储在my_func.target里的原始my_func的实例。

    这就不太好了。它是一个顶层的函数,通常这些是可以被序列化的。pickle会存储这个函数的名字。然而,问题在于,名字“my_func”不再和未装饰的函数绑定,而是和装饰过的函数绑定。这意味着pickle无法查找未装饰的函数来重新创建对象。可惜的是,pickle没有办法知道它试图序列化的对象总是可以通过名字__main__.my_func找到。

你可以这样修改,它就能正常工作:

import random
import multiprocessing
import functools

class my_decorator(object):
    def __init__(self, target):
        self.target = target
        try:
            functools.update_wrapper(self, target)
        except:
            pass

    def __call__(self, candidates, args):
        f = []
        for candidate in candidates:
            f.append(self.target([candidate], args)[0])
        return f

def old_my_func(candidates, args):
    f = []
    for c in candidates:
        f.append(sum(c))
    return f

my_func = my_decorator(old_my_func)

if __name__ == '__main__':
    candidates = [[random.randint(0, 9) for _ in range(5)] for _ in range(10)]
    pool = multiprocessing.Pool(processes=4)
    results = [pool.apply_async(my_func, ([c], {})) for c in candidates]
    pool.close()
    f = [r.get()[0] for r in results]
    print(f)

你观察到装饰器函数在类不工作时仍然有效。我认为这是因为functools.wraps修改了被装饰的函数,使它拥有被装饰函数的名字和其他属性。就pickle模块而言,它无法区分这是否是一个普通的顶层函数,因此它通过存储名字来序列化。反序列化时,名字绑定到装饰过的函数上,所以一切都能正常运作。

撰写回答