Python函数在单独进程中。可以统一包装函数吗
我刚接触Python,想知道怎么更高效地实现一个问题。我有一些函数f1、f2……fN,这些函数是“包装器”,它们会启动新的进程(对应的目标是_f1、_f2……_fN),把参数(arg1、arg2……)传给子进程,并接收返回值。
我希望这些代码能在一个与调用者(模块的使用者)不同的进程中执行。
函数f1、f2……fN(对应的_f1、_f2……_fN)可能有不同的参数格式。
in a module
def _f1(arg1, arg2, ... argn, connection):
...
connection.send(return_value)
connection.close()
def f1(arg1, arg2, ... argn):
parent_conn, child_conn = Pipe()
p = Process(target=_f1, args=(arg1, arg2, ... argn, child_conn))
p.start()
p.join()
return parent_conn.recv()
def _f2(arg1, arg2, ... argm, connection):
...
connection.send(return_value)
connection.close()
def f2(arg1, arg2, ... argn):
parent_conn, child_conn = Pipe()
p = Process(target=_f2, args=(arg1, arg2, ... argm, child_conn))
p.start()
p.join()
return parent_conn.recv()
...
def _fn(arg1, arg2, ... argk, connection):
...
connection.send(return_value)
connection.close()
def fN(arg1, arg2, ... argn):
parent_conn, child_conn = Pipe()
p = Process(target=_fN, args=(arg1, arg2, ... argk, child_conn))
p.start()
p.join()
return parent_conn.recv()
很明显,包装函数f1、f2、fN之间是相似的。我能否把它们实现为一个单独的包装函数呢?我希望执行过程不会阻塞。模块的使用者应该能够同时执行f1和f2,比如说。
我希望我能清楚地表达我的问题。
这里有一个具体的例子,包含两个函数sum()和sin():
def _sum(a, b, connection):
return_value=a+b
connection.send(return_value)
connection.close()
def sum(a, b):
parent_conn, child_conn = Pipe()
p = Process(target=_sum, args=(a, b, child_conn))
p.start()
p.join()
return parent_conn.recv()
def _sin(x, connection):
return_value=sin(x)
connection.send(return_value)
connection.close()
def sin(x):
parent_conn, child_conn = Pipe()
p = Process(target=_sin, args=(x, child_conn))
p.start()
p.join()
return parent_conn.recv()
根据srj的建议使用装饰器,我找到了下面的解决方案。我还尝试进一步扩展,想把connection.send(return_value)和connection.close()也装饰一下,但对我来说不太管用。下面是代码。我在注释中说明了哪些部分有效,哪些部分(在我看来)无效。请问有什么帮助吗?
from multiprocessing import Process, Pipe
def process_wrapper1(func):
def wrapper(*args):
parent_conn, child_conn = Pipe()
f_args = args + (child_conn,)
p = Process(target=func, args=f_args)
p.start()
p.join()
return parent_conn.recv()
return wrapper
def process_wrapper2(func):
def wrapper(*args):
res=func(*args[0:len(args)-1])
args[-1].send(res)
args[-1].close()
return wrapper
#def _sum(a, b, connection): #Working
# return_value=a+b
# connection.send(return_value)
# connection.close()
def __sum(a, b): #Doesn't work, see the error bellow
return(a+b)
_sum=process_wrapper2(__sum)
sum=process_wrapper1(_sum)
在Pyzo的ipython shell中,上面的代码生成了以下结果:
In [3]: import test1
In [4]: test1.sum(2,3)
---------------------------------------------------------------------------
PicklingError Traceback (most recent call last)
<ipython-input-4-8c542dc5e11a> in <module>()
----> 1 test1.sum(2,3)
C:\projects\PYnGUInLib\test1.py in wrapper(*args)
11 f_args = (child_conn,) + args
12 p = Process(target=func, args=f_args)
---> 13 p.start()
14 p.join()
15 return parent_conn.recv()
C:\pyzo2014a_64b\lib\multiprocessing\process.py in start(self)
103 'daemonic processes are not allowed to have children'
104 _cleanup()
--> 105 self._popen = self._Popen(self)
106 self._sentinel = self._popen.sentinel
107 _children.add(self)
C:\pyzo2014a_64b\lib\multiprocessing\context.py in _Popen(process_obj)
210 @staticmethod
211 def _Popen(process_obj):
--> 212 return _default_context.get_context().Process._Popen(process_obj)
213
214 class DefaultContext(BaseContext):
C:\pyzo2014a_64b\lib\multiprocessing\context.py in _Popen(process_obj)
311 def _Popen(process_obj):
312 from .popen_spawn_win32 import Popen
--> 313 return Popen(process_obj)
314
315 class SpawnContext(BaseContext):
C:\pyzo2014a_64b\lib\multiprocessing\popen_spawn_win32.py in __init__(self, process_obj)
64 try:
65 reduction.dump(prep_data, to_child)
---> 66 reduction.dump(process_obj, to_child)
67 finally:
68 context.set_spawning_popen(None)
C:\pyzo2014a_64b\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
57 def dump(obj, file, protocol=None):
58 '''Replacement for pickle.dump() using ForkingPickler.'''
---> 59 ForkingPickler(file, protocol).dump(obj)
60
61 #
PicklingError: Can't pickle <function process_wrapper2.<locals>.wrapper at 0x0000000005541048>: attribute lookup wrapper on test1 failed
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\pyzo2014a_64b\lib\multiprocessing\spawn.py", line 106, in spawn_main
exitcode = _main(fd)
File "C:\pyzo2014a_64b\lib\multiprocessing\spawn.py", line 116, in _main
self = pickle.load(from_parent)
EOFError: Ran out of input
In [5]:
2 个回答
你应该使用 multiprocessing.Pool
。下面是一个例子:
def f1(*args):
rv = do_calculations()
return rv
def f2(*args):
...
...
def fN(*args):
...
def worker(args):
fn = args[0]
return fn(*args[1:])
inputs = [
[f1, f1_args],
[f2, f2_args],
...
[fN, fN_args]
]
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
results = pool.map(worker, inputs)
你可以使用一个叫做装饰器的东西来包装这个函数,这样就能自动处理创建进程和执行的那些繁琐步骤。
def process_wrapper(func):
def wrapper(*args):
parent_conn, child_conn = Pipe()
#attach the connection to the arguments
f_args = args + (child_conn,)
p = Process(target=func, args=f_args)
p.start()
p.join()
return parent_conn.recv()
return wrapper
然后你可以这样定义这个函数
@process_wrapper
def _f2(arg1, arg2, ... argm, connection):
...
connection.send(return_value)
connection.close()
解释:这个process_wrapper
函数接收一个有N个位置参数的函数,其中最后一个参数总是一个管道连接。它会返回一个新的函数,这个新函数有N-1个参数,并且把连接的部分提前填好了。
以你的具体函数为例,
@process_wrapper
def sin(x, connection):
return_value=sin(x)
connection.send(return_value)
connection.close()
@process_wrapper
def sum(a, b, connection):
return_value=a+b
connection.send(return_value)
connection.close()
你可以这样调用这个函数
sum(a,b)
想了解更多关于Python装饰器的内容,可以参考这个链接:http://www.jeffknupp.com/blog/2013/11/29/improve-your-python-decorators-explained/