Python:生成一个返回同一类的方法的类方法修饰符

2024-04-29 11:05:11 发布

您现在位置:Python中文网/ 问答频道 /正文

我对decorator不太熟悉,这可能比我第一个decorator项目吃得太多了,但是我想做的是制作一个paralleldecorator,它接受一个看似谦逊地应用于单个参数的函数,并自动与multiprocessing一起分发,并将其转换为一个应用于参数列表的函数。在

我正在跟踪前面的问题this very helpful answer,这样我就可以成功地pickle类实例方法,并且可以得到类似于答案的示例来正常工作。在

这是我第一次尝试并行decorator(在参考了一些线程装饰器的web点击之后)。在

###########
# Imports #
###########
import types, copy_reg, multiprocessing as mp
import pandas, numpy as np
### End Imports

##################
# Module methods #
##################

# Parallel decorator
def parallel(f):

    def executor(*args):
        _pool   = mp.Pool(2)
        _result = _pool.map_async(f, args[1:])
        # I used args[1:] because the input will be a
        # class instance method, so gotta skip over the self object.
        # but it seems like there ought to be a better way...

        _pool.close()
        _pool.join()
        return _result.get()
    return executor
### End parallel

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    cls_name = ''
    if func_name.startswith('__') and not func_name.endswith('__'):
        cls_name = cls.__name__.lstrip('_')
    if cls_name:
        func_name = '_' + cls_name + func_name
    return _unpickle_method, (func_name, obj, cls)
### End _pickle_method

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)
### End _unpickle_method

# This call to copy_reg.pickle allows you to pass methods as the first arg to
# mp.Pool methods. If you comment out this line, `pool.map(self.foo, ...)` results in
# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
# __builtin__.instancemethod failed
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
copy_reg.pickle(types.FunctionType, _pickle_method, _unpickle_method)
### End Module methods


##################
# Module classes #
##################
class Foo(object):


    def __init__(self, args):
        self.my_args = args
    ### End __init__

    def squareArg(self, arg):
        return arg**2
    ### End squareArg

    def par_squareArg(self):
        p = mp.Pool(2) # Replace 2 with the number of processors.
        q = p.map_async(self.squareArg, self.my_args)

        p.close()
        p.join()

        return q.get()
    ### End par_SquarArg

    @parallel
    def parSquare(self, num):
        return self.squareArg(num)
    ### End parSquare
### End Foo
### End Module classes


###########
# Testing #
###########
if __name__ == "__main__":

    myfoo = Foo([1,2,3,4])
    print myfoo.par_squareArg()
    print myfoo.parSquare(myfoo.my_args)

### End Testing

但是当我使用这种方法(愚蠢地尝试用相同的_pickle_method_unpickle_method)来增强arm pickling函数时,我首先得到一个错误,即,AttributeError: 'function' object has no attribute 'im_func',但更普遍的错误是函数不能被pickle。在

所以问题是双重的。(1) 我该如何修改decorator,使其获得的f对象是一个类的实例方法,那么它返回的executor也是该类对象的一个实例方法(这样就不会发生不能pickle的事情,因为我可以pickle这些实例方法)?(2)创建附加的_pickle_function和{}方法是否更好?我认为Python可以pickle模块级函数,所以如果我的代码没有导致executor成为一个实例方法,那么它看起来应该是一个模块级函数,但是为什么不能对它进行pickle呢?在


Tags: 方法函数nameselfreturndefargsdecorator
2条回答

好吧,这不是你要找的答案,但是Sage有一个@parallel装饰器,它与你要找的东西一脉相承。你可以在网上找到documentationsource code。在

不过,作为一般规则,在您看到失败的行之前添加import pdb;pdb.set_trace(),并检查所有可见的对象。如果您使用的是ipython,那么您可以使用%pdb魔术命令或执行along these lines操作。在

(1) How could I modify the decorator so that if the f object it takes is an instance method of a class, then the executor it returns is also an instance method of that class object (so that this business about not being able to pickle does not happen, since I can pickle those instance methods)?

>>> myfoo.parSquare
<bound method Foo.executor of <__main__.Foo object at 0x101332510>>

如您所见,parSquare实际上是executor,它已经成为一个实例方法,这并不奇怪,因为decorator是某种函数包装器。。。在

How to make a chain of function decorators?可能是对装饰者最好的描述。在

(2) Is it better to create addiitional _pickle_function and _unpickle_function methods?

您不需要python已经支持它们,事实上,copy_reg.pickle(types.FunctionType, _pickle_method, _unpickle_method)似乎有点奇怪,因为您使用相同的算法来pickle这两种类型。在

现在更大的问题是,为什么我们得到PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed错误本身看起来有些模糊,但它看起来好像找不到我们的函数?
我认为这是因为decorator重写了一个函数,在您的例子中,parSquare变成了{},但是{}是{}的一个内部函数,因此它不可导入,因此查找似乎失败了,这只是一种预感。

让我们试一个简单的例子。在

^{2}$

与我们得到的错误几乎相同。
请注意,上述代码相当于:

def parallel(function):                        
    def apply(values):
        from multiprocessing import Pool
        pool = Pool(4)
        result = pool.map(function, values)
        pool.close()
        pool.join()
        return result    
    return apply    

def square(value):
    return value**2

square = parallel(square)

这会产生相同的错误,同时请注意,如果我们不重命名函数。在

>>> def parallel(function):                        
...     def apply(values):
...         from multiprocessing import Pool
...         pool = Pool(4)
...         result = pool.map(function, values)
...         pool.close()
...         pool.join()
...         return result    
...     return apply    
... 
>>> def _square(value):
...     return value**2
... 
>>> square = parallel(_square)
>>> square([1,2,3,4])
[1, 4, 9, 16]
>>>

它工作得很好,我一直在寻找一种方法来控制装饰师处理名字的方式,但是没有用,我还是想用它们进行多处理,所以我想出了一个有点难看的办法:

>>> def parallel(function):                
...     def temp(_):    
...         def apply(values):
...             from multiprocessing import Pool
...             pool = Pool(4)
...             result = pool.map(function, values)
...             pool.close()
...             pool.join()
...             return result    
...         return apply
...     return temp
... 
>>> def _square(value):
...     return value*value    
... 
>>> @parallel(_square)
... def square(values):
...     pass 
... 
>>> square([1,2,3,4])
[1, 4, 9, 16]
>>>

所以基本上我把实数函数传递给了decorator,然后我使用了第二个函数来处理这些值,正如您所看到的那样,它工作得很好。在

我稍微修改了您的初始代码以更好地处理decorator,尽管它并不完美。在

import types, copy_reg, multiprocessing as mp

def parallel(f):    
    def executor(*args):
        _pool   = mp.Pool(2)
        func = getattr(args[0], f.__name__) # This will get the actual method function so we can use our own pickling procedure
        _result = _pool.map(func, args[1])
        _pool.close()
        _pool.join()
        return _result
    return executor

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    cls_name = ''
    if func_name.startswith('__') and not func_name.endswith('__'):
        cls_name = cls.__name__.lstrip('_')
    if cls_name:
        func_name = '_' + cls_name + func_name
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    func = None
    for cls in cls.mro():        
        if func_name in cls.__dict__:
            func = cls.__dict__[func_name] # This will fail with the decorator, since parSquare is being wrapped around as executor             
            break
        else:
            for attr in dir(cls):
                prop = getattr(cls, attr)                
                if hasattr(prop, '__call__') and prop.__name__ == func_name:
                    func = cls.__dict__[attr]
                    break
    if func == None:
        raise KeyError("Couldn't find function %s withing %s" % (str(func_name), str(cls)))        
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class Foo(object):
    def __init__(self, args):
        self.my_args = args
    def squareArg(self, arg):
        return arg**2
    def par_squareArg(self):
        p = mp.Pool(2) # Replace 2 with the number of processors.
        q = p.map(self.squareArg, self.my_args)
        p.close()
        p.join()
        return q    
    @parallel
    def parSquare(self, num):
        return self.squareArg(num)  

if __name__ == "__main__":
    myfoo = Foo([1,2,3,4])
    print myfoo.par_squareArg()
    print myfoo.parSquare(myfoo.my_args)  

基本上,这仍然失败,给我们AssertionError: daemonic processes are not allowed to have children因为子进程尝试调用函数,请记住,子进程并不是真正复制代码,只是名称。。。在

有一个解决方法与我前面提到的类似:

import types, copy_reg, multiprocessing as mp

def parallel(f):    
    def temp(_):
        def executor(*args):
            _pool   = mp.Pool(2)
            func = getattr(args[0], f.__name__) # This will get the actual method function so we can use our own pickling procedure
            _result = _pool.map(func, args[1])
            _pool.close()
            _pool.join()
            return _result        
        return executor
    return temp

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    cls_name = ''
    if func_name.startswith('__') and not func_name.endswith('__'):
        cls_name = cls.__name__.lstrip('_')
    if cls_name:
        func_name = '_' + cls_name + func_name
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    func = None
    for cls in cls.mro():        
        if func_name in cls.__dict__:
            func = cls.__dict__[func_name] # This will fail with the decorator, since parSquare is being wrapped around as executor             
            break
        else:
            for attr in dir(cls):
                prop = getattr(cls, attr)                
                if hasattr(prop, '__call__') and prop.__name__ == func_name:
                    func = cls.__dict__[attr]
                    break
    if func == None:
        raise KeyError("Couldn't find function %s withing %s" % (str(func_name), str(cls)))        
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class Foo(object):
    def __init__(self, args):
        self.my_args = args
    def squareArg(self, arg):
        return arg**2
    def par_squareArg(self):
        p = mp.Pool(2) # Replace 2 with the number of processors.
        q = p.map(self.squareArg, self.my_args)
        p.close()
        p.join()
        return q
    def _parSquare(self, num):    
        return self.squareArg(num)
    @parallel(_parSquare)
    def parSquare(self, num):
        pass    


if __name__ == "__main__":
    myfoo = Foo([1,2,3,4])
    print myfoo.par_squareArg()
    print myfoo.parSquare(myfoo.my_args)

[1, 4, 9, 16]
[1, 4, 9, 16]

最后一件事,多线程处理时要非常小心,根据数据的分段方式,多线程处理的时间实际上比单线程慢,这主要是由于来回复制值以及创建和销毁子进程的开销所致。

总是对单线程/多线程进行基准测试,并尽可能对数据进行适当的分段。在

典型案例:

import numpy
import time
from multiprocessing import Pool

def square(value):
    return value*value

if __name__ == '__main__':
    pool = Pool(5)
    values = range(1000000)
    start = time.time()
    _ = pool.map(square, values)
    pool.close()
    pool.join()
    end = time.time()

    print "multithreaded time %f" % (end - start)
    start = time.time()
    _ = map(square, values)
    end = time.time()
    print "single threaded time %f" % (end - start)

    start = time.time()
    _ = numpy.asarray(values)**2
    end = time.time()
    print "numpy time %f" % (end - start)

    v = numpy.asarray(values)
    start = time.time()
    _ = v**2
    end = time.time()
    print "numpy without pre-initialization %f" % (end - start)

给我们:

multithreaded time 0.484441
single threaded time 0.196421
numpy time 0.184163
numpy without pre-initialization 0.004490

相关问题 更多 >