<blockquote>
<p>(1) How could I modify the decorator so that if the f object it takes is an instance method of a class, then the executor it returns is also an instance method of that class object (so that this business about not being able to pickle does not happen, since I can pickle those instance methods)? </p>
</blockquote>
<pre><code>>>> myfoo.parSquare
<bound method Foo.executor of <__main__.Foo object at 0x101332510>>
</code></pre>
<p>如您所见,parSquare实际上是executor,它已经成为一个实例方法,这并不奇怪,因为decorator是某种函数包装器。。。在</p>
<p><a href="https://stackoverflow.com/questions/739654/understanding-python-decorators">How to make a chain of function decorators?</a>可能是对装饰者最好的描述。在</p>
<blockquote>
<p>(2) Is it better to create addiitional _pickle_function and _unpickle_function methods?</p>
</blockquote>
<p>您不需要python已经支持它们,事实上,<code>copy_reg.pickle(types.FunctionType, _pickle_method, _unpickle_method)</code>似乎有点奇怪,因为您使用相同的算法来pickle这两种类型。在</p>
<p>现在更大的问题是,为什么我们得到<code>PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed</code>错误本身看起来有些模糊,但它看起来好像找不到我们的函数?<br/>
我认为这是因为decorator重写了一个函数,在您的例子中,<code>parSquare</code>变成了{<cd4>},但是{<cd4>}是{<cd6>}的一个内部函数,因此它不可导入,因此查找似乎失败了,这只是一种预感。<br/></p>
<p>让我们试一个简单的例子。在</p>
^{2}$
<p>与我们得到的错误几乎相同。<br/>
请注意,上述代码相当于:</p>
<pre><code>def parallel(function):
def apply(values):
from multiprocessing import Pool
pool = Pool(4)
result = pool.map(function, values)
pool.close()
pool.join()
return result
return apply
def square(value):
return value**2
square = parallel(square)
</code></pre>
<p>这会产生相同的错误,同时请注意,如果我们不重命名函数。在</p>
<pre><code>>>> def parallel(function):
... def apply(values):
... from multiprocessing import Pool
... pool = Pool(4)
... result = pool.map(function, values)
... pool.close()
... pool.join()
... return result
... return apply
...
>>> def _square(value):
... return value**2
...
>>> square = parallel(_square)
>>> square([1,2,3,4])
[1, 4, 9, 16]
>>>
</code></pre>
<p>它工作得很好,我一直在寻找一种方法来控制装饰师处理名字的方式,但是没有用,我还是想用它们进行多处理,所以我想出了一个有点难看的办法:</p>
<pre><code>>>> def parallel(function):
... def temp(_):
... def apply(values):
... from multiprocessing import Pool
... pool = Pool(4)
... result = pool.map(function, values)
... pool.close()
... pool.join()
... return result
... return apply
... return temp
...
>>> def _square(value):
... return value*value
...
>>> @parallel(_square)
... def square(values):
... pass
...
>>> square([1,2,3,4])
[1, 4, 9, 16]
>>>
</code></pre>
<p>所以基本上我把实数函数传递给了decorator,然后我使用了第二个函数来处理这些值,正如您所看到的那样,它工作得很好。在</p>
<p>我稍微修改了您的初始代码以更好地处理decorator,尽管它并不完美。在</p>
<pre><code>import types, copy_reg, multiprocessing as mp
def parallel(f):
def executor(*args):
_pool = mp.Pool(2)
func = getattr(args[0], f.__name__) # This will get the actual method function so we can use our own pickling procedure
_result = _pool.map(func, args[1])
_pool.close()
_pool.join()
return _result
return executor
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
cls_name = ''
if func_name.startswith('__') and not func_name.endswith('__'):
cls_name = cls.__name__.lstrip('_')
if cls_name:
func_name = '_' + cls_name + func_name
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
func = None
for cls in cls.mro():
if func_name in cls.__dict__:
func = cls.__dict__[func_name] # This will fail with the decorator, since parSquare is being wrapped around as executor
break
else:
for attr in dir(cls):
prop = getattr(cls, attr)
if hasattr(prop, '__call__') and prop.__name__ == func_name:
func = cls.__dict__[attr]
break
if func == None:
raise KeyError("Couldn't find function %s withing %s" % (str(func_name), str(cls)))
return func.__get__(obj, cls)
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
class Foo(object):
def __init__(self, args):
self.my_args = args
def squareArg(self, arg):
return arg**2
def par_squareArg(self):
p = mp.Pool(2) # Replace 2 with the number of processors.
q = p.map(self.squareArg, self.my_args)
p.close()
p.join()
return q
@parallel
def parSquare(self, num):
return self.squareArg(num)
if __name__ == "__main__":
myfoo = Foo([1,2,3,4])
print myfoo.par_squareArg()
print myfoo.parSquare(myfoo.my_args)
</code></pre>
<p>基本上,这仍然失败,给我们<code>AssertionError: daemonic processes are not allowed to have children</code>因为子进程尝试调用函数,请记住,子进程并不是真正复制代码,只是名称。。。在</p>
<p>有一个解决方法与我前面提到的类似:</p>
<pre><code>import types, copy_reg, multiprocessing as mp
def parallel(f):
def temp(_):
def executor(*args):
_pool = mp.Pool(2)
func = getattr(args[0], f.__name__) # This will get the actual method function so we can use our own pickling procedure
_result = _pool.map(func, args[1])
_pool.close()
_pool.join()
return _result
return executor
return temp
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
cls_name = ''
if func_name.startswith('__') and not func_name.endswith('__'):
cls_name = cls.__name__.lstrip('_')
if cls_name:
func_name = '_' + cls_name + func_name
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
func = None
for cls in cls.mro():
if func_name in cls.__dict__:
func = cls.__dict__[func_name] # This will fail with the decorator, since parSquare is being wrapped around as executor
break
else:
for attr in dir(cls):
prop = getattr(cls, attr)
if hasattr(prop, '__call__') and prop.__name__ == func_name:
func = cls.__dict__[attr]
break
if func == None:
raise KeyError("Couldn't find function %s withing %s" % (str(func_name), str(cls)))
return func.__get__(obj, cls)
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
class Foo(object):
def __init__(self, args):
self.my_args = args
def squareArg(self, arg):
return arg**2
def par_squareArg(self):
p = mp.Pool(2) # Replace 2 with the number of processors.
q = p.map(self.squareArg, self.my_args)
p.close()
p.join()
return q
def _parSquare(self, num):
return self.squareArg(num)
@parallel(_parSquare)
def parSquare(self, num):
pass
if __name__ == "__main__":
myfoo = Foo([1,2,3,4])
print myfoo.par_squareArg()
print myfoo.parSquare(myfoo.my_args)
[1, 4, 9, 16]
[1, 4, 9, 16]
</code></pre>
<p>最后一件事,多线程处理时要非常小心,根据数据的分段方式,多线程处理的时间实际上比单线程慢,这主要是由于来回复制值以及创建和销毁子进程的开销所致。<br/></p>
<p>总是对单线程/多线程进行基准测试,并尽可能对数据进行适当的分段。在</p>
<p>典型案例:</p>
<pre><code>import numpy
import time
from multiprocessing import Pool
def square(value):
return value*value
if __name__ == '__main__':
pool = Pool(5)
values = range(1000000)
start = time.time()
_ = pool.map(square, values)
pool.close()
pool.join()
end = time.time()
print "multithreaded time %f" % (end - start)
start = time.time()
_ = map(square, values)
end = time.time()
print "single threaded time %f" % (end - start)
start = time.time()
_ = numpy.asarray(values)**2
end = time.time()
print "numpy time %f" % (end - start)
v = numpy.asarray(values)
start = time.time()
_ = v**2
end = time.time()
print "numpy without pre-initialization %f" % (end - start)
</code></pre>
<p>给我们:</p>
<pre><code>multithreaded time 0.484441
single threaded time 0.196421
numpy time 0.184163
numpy without pre-initialization 0.004490
</code></pre>