我对decorator不太熟悉,这可能比我第一个decorator项目吃得太多了,但是我想做的是制作一个parallel
decorator,它接受一个看似谦逊地应用于单个参数的函数,并自动与multiprocessing
一起分发,并将其转换为一个应用于参数列表的函数。在
我正在跟踪前面的问题this very helpful answer,这样我就可以成功地pickle类实例方法,并且可以得到类似于答案的示例来正常工作。在
这是我第一次尝试并行decorator(在参考了一些线程装饰器的web点击之后)。在
###########
# Imports #
###########
import types, copy_reg, multiprocessing as mp
import pandas, numpy as np
### End Imports
##################
# Module methods #
##################
# Parallel decorator
def parallel(f):
def executor(*args):
_pool = mp.Pool(2)
_result = _pool.map_async(f, args[1:])
# I used args[1:] because the input will be a
# class instance method, so gotta skip over the self object.
# but it seems like there ought to be a better way...
_pool.close()
_pool.join()
return _result.get()
return executor
### End parallel
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
cls_name = ''
if func_name.startswith('__') and not func_name.endswith('__'):
cls_name = cls.__name__.lstrip('_')
if cls_name:
func_name = '_' + cls_name + func_name
return _unpickle_method, (func_name, obj, cls)
### End _pickle_method
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
### End _unpickle_method
# This call to copy_reg.pickle allows you to pass methods as the first arg to
# mp.Pool methods. If you comment out this line, `pool.map(self.foo, ...)` results in
# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
# __builtin__.instancemethod failed
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
copy_reg.pickle(types.FunctionType, _pickle_method, _unpickle_method)
### End Module methods
##################
# Module classes #
##################
class Foo(object):
def __init__(self, args):
self.my_args = args
### End __init__
def squareArg(self, arg):
return arg**2
### End squareArg
def par_squareArg(self):
p = mp.Pool(2) # Replace 2 with the number of processors.
q = p.map_async(self.squareArg, self.my_args)
p.close()
p.join()
return q.get()
### End par_SquarArg
@parallel
def parSquare(self, num):
return self.squareArg(num)
### End parSquare
### End Foo
### End Module classes
###########
# Testing #
###########
if __name__ == "__main__":
myfoo = Foo([1,2,3,4])
print myfoo.par_squareArg()
print myfoo.parSquare(myfoo.my_args)
### End Testing
但是当我使用这种方法(愚蠢地尝试用相同的_pickle_method
和_unpickle_method
)来增强arm pickling函数时,我首先得到一个错误,即,AttributeError: 'function' object has no attribute 'im_func'
,但更普遍的错误是函数不能被pickle。在
所以问题是双重的。(1) 我该如何修改decorator,使其获得的f
对象是一个类的实例方法,那么它返回的executor
也是该类对象的一个实例方法(这样就不会发生不能pickle的事情,因为我可以pickle这些实例方法)?(2)创建附加的_pickle_function
和{executor
成为一个实例方法,那么它看起来应该是一个模块级函数,但是为什么不能对它进行pickle呢?在
好吧,这不是你要找的答案,但是Sage有一个
@parallel
装饰器,它与你要找的东西一脉相承。你可以在网上找到documentation和source code。在不过,作为一般规则,在您看到失败的行之前添加
import pdb;pdb.set_trace()
,并检查所有可见的对象。如果您使用的是ipython
,那么您可以使用%pdb
魔术命令或执行along these lines操作。在如您所见,parSquare实际上是executor,它已经成为一个实例方法,这并不奇怪,因为decorator是某种函数包装器。。。在
How to make a chain of function decorators?可能是对装饰者最好的描述。在
您不需要python已经支持它们,事实上,
copy_reg.pickle(types.FunctionType, _pickle_method, _unpickle_method)
似乎有点奇怪,因为您使用相同的算法来pickle这两种类型。在现在更大的问题是,为什么我们得到},但是{}是{}的一个内部函数,因此它不可导入,因此查找似乎失败了,这只是一种预感。
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
错误本身看起来有些模糊,但它看起来好像找不到我们的函数?我认为这是因为decorator重写了一个函数,在您的例子中,
parSquare
变成了{让我们试一个简单的例子。在
^{2}$与我们得到的错误几乎相同。
请注意,上述代码相当于:
这会产生相同的错误,同时请注意,如果我们不重命名函数。在
它工作得很好,我一直在寻找一种方法来控制装饰师处理名字的方式,但是没有用,我还是想用它们进行多处理,所以我想出了一个有点难看的办法:
所以基本上我把实数函数传递给了decorator,然后我使用了第二个函数来处理这些值,正如您所看到的那样,它工作得很好。在
我稍微修改了您的初始代码以更好地处理decorator,尽管它并不完美。在
基本上,这仍然失败,给我们
AssertionError: daemonic processes are not allowed to have children
因为子进程尝试调用函数,请记住,子进程并不是真正复制代码,只是名称。。。在有一个解决方法与我前面提到的类似:
最后一件事,多线程处理时要非常小心,根据数据的分段方式,多线程处理的时间实际上比单线程慢,这主要是由于来回复制值以及创建和销毁子进程的开销所致。
总是对单线程/多线程进行基准测试,并尽可能对数据进行适当的分段。在
典型案例:
给我们:
相关问题 更多 >
编程相关推荐