在GUI应用中使用多进程进行异步调用
我有一个图形用户界面(GUI)应用程序,需要在主界面循环之外,从网络获取和解析各种资源。
我查找了使用 Python 的多进程模块的选项,因为这些获取操作不仅涉及到阻塞的输入输出,还包括大量的解析工作,所以多进程可能比 Python 的线程更合适。
使用 Twisted 会很简单,但这次不考虑使用 Twisted。
我在这里找到一个简单的解决方案:
问题是,回调函数在主线程中没有被调用。
所以我想出了以下解决方案:
delegate.py
import os
import multiprocessing as mp
import signal
from collections import namedtuple
import uuid
import logging
_CALLBACKS = {}
_QUEUE = mp.Queue()
info = logging.getLogger(__name__).info
class Call(namedtuple('Call', 'id finished result error')):
def attach(self, func):
if not self.finished:
_CALLBACKS.setdefault(self.id, []).append(func)
else:
func(self.result or self.error)
return self
def callback(self):
assert self.finished, 'Call not finished yet'
r = self.result or self.error
for func in _CALLBACKS.pop(self.id, []):
func(r)
def done(self, result=None, error=None):
assert not self.finished, 'Call already finished'
return self._replace(finished=(-1 if error else 1),
result=result, error=error)
@classmethod
def create(clss):
call = clss(uuid.uuid4().hex, 0, None, None) # uuid ???
return call
def run(q, cb, func, args=None, kwargs=None):
info('run: try running %s' % func)
try:
cb = cb.done(result=func(*(args or ()), **(kwargs or {})))
except Exception, err:
cb = cb.done(error=err)
q.put(cb)
os.kill(os.getppid(), signal.SIGUSR2) # SIGUSR2 ???
info('run: leaving')
def on_callback(sig, frame):
info('on_callback: checking queue ...')
c = _QUEUE.get(True, 2)
info('on_callback: got call - %s' % repr(c))
c.callback()
signal.signal(signal.SIGUSR2, on_callback) # SIGUSR2 ???
def delegate(func, *args, **kwargs):
info('delegate: %s %s' % (func, args,))
cb = Call.create()
mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start()
return cb
__all__ = ['delegate']
用法
from delegate import delegate
def sleeper(secs):
assert secs >= 1, 'I need my Augenpflege'
info('sleeper: will go to sleep for %s secs' % secs)
sleep(secs)
info('sleeper: woke up - returning result')
return ['sleeper', 'result']
def on_sleeper_result(r):
if isinstance(r, Exception):
info('on_sleeper_result: got error: %s' % r)
else:
info('on_sleeper_result: got result: %s' % r)
from delegate import delegate
delegate(sleeper, 3).attach(on_sleeper_result)
delegate(sleeper, -3).attach(on_sleeper_result)
while 1:
info('main: loop')
sleep(1)
输出
0122 08432 MainThread INFO delegate: <function sleeper at 0x163e320> (3,)
0123 08432 MainThread INFO delegate: <function sleeper at 0x163e320> (-3,)
0124 08437 MainThread INFO run: try running <function sleeper at 0x163e320>
0124 08437 MainThread INFO sleeper: will go to sleep for 3 secs
0124 08432 MainThread INFO main: loop
0125 08438 MainThread INFO run: try running <function sleeper at 0x163e320>
0126 08438 MainThread INFO run: leaving
0126 08432 MainThread INFO on_callback: checking queue ...
0126 08432 MainThread INFO on_callback: got call - Call(id='057649cba7d840e3825aa5ac73248f78', finished=-1, result=None, error=AssertionError('I need my Augenpflege',))
0127 08432 MainThread INFO on_sleeper_result: got error: I need my Augenpflege
0127 08432 MainThread INFO main: loop
1128 08432 MainThread INFO main: loop
2129 08432 MainThread INFO main: loop
3127 08437 MainThread INFO sleeper: woke up - returning result
3128 08437 MainThread INFO run: leaving
3128 08432 MainThread INFO on_callback: checking queue ...
3129 08432 MainThread INFO on_callback: got call - Call(id='041420c6c83a489aa5c7409c662d4917', finished=1, result=['sleeper', 'result'], error=None)
3129 08432 MainThread INFO on_sleeper_result: got result: ['sleeper', 'result']
3129 08432 MainThread INFO main: loop
4130 08432 MainThread INFO main: loop
5132 08432 MainThread INFO main: loop
...
到目前为止,这个方法运行得很好,但我对多进程模块的经验有限,有点不确定这样运行是否会有问题。我的问题是:在使用多进程时,有哪些特别需要注意的地方……或者有没有更合适的方式来实现异步回调机制,使用 Python 标准库?
2 个回答
self.finished = -1 if error else 1
你的代码可以运行,但其实可以更简单一些。我们来逐步分析一下这段代码。
这段代码在主进程中创建了一个 Call
实例:
def delegate(func, *args, **kwargs):
cb = Call.create()
但是当你把 cb
传给工作进程时,
mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start()
这个 Call
实例在 os.fork
的过程中被复制了,因此创建了一个新的、独立的实例。然后它调用了 cb.done
,接着又调用了 cb._replace
,这又返回了第三个 Call
实例:
def done(self, result=None, error=None):
assert not self.finished, 'Call already finished'
return self._replace(finished=(-1 if error else 1),
result=result, error=error)
上面的调用了一个私有的方法 _replace
。如果 Call
是一个 object
的子类,而不是 namedtuple
的子类,那本来可以用简单的 Python 语句来实现。
使用 namedtuple
作为基类虽然在 __init__
中省了点代码,但后面就变得有点麻烦,因为我们需要修改 namedtuple
的属性……
与此同时,主进程中通过 delegate(...)
返回的原始 Call
实例调用了 attach
:
delegate(...).attach(on_sleeper_result)
这会修改全局的 _CALLBACKS
字典。工作进程并不知道 _CALLBACKS
的这个变化;在工作进程中,_CALLBACKS
仍然是一个空字典。所以你必须通过 mp.Queue
将工作进程中的 Call
实例传回主进程,这样才能用 cb.id
来引用 _CALLBACKS
中正确的函数。
所以这一切都能正常工作,但每次调用 delegate
时都会创建三个 Call
实例,这可能会让不太了解的人误以为这三个 Call
实例是同一个对象……虽然一切都能正常运行,但确实有点复杂。
你有没有考虑过使用 mp.Pool.apply_async
的 callback
参数呢?
import multiprocessing as mp
import logging
import time
import collections
_CALLBACKS=collections.defaultdict(list)
logger=mp.log_to_stderr(logging.DEBUG)
def attach(name,func):
_CALLBACKS[name].append(func)
def delegate(func, *args, **kwargs):
id=kwargs.pop('id')
try:
result=func(*args,**kwargs)
except Exception, err:
result=err
return (id,result)
def sleeper(secs):
assert secs >= 1, 'I need my Augenpflege'
logger.info('sleeper: will go to sleep for %s secs' % secs)
time.sleep(secs)
logger.info('sleeper: woke up - returning result')
return ['sleeper', 'result']
def callback(r):
id,result=r
for func in _CALLBACKS[id]:
func(result)
def on_sleeper_result(r):
if isinstance(r, Exception):
logger.error('on_sleeper_result: got error: %s' % r)
else:
logger.info('on_sleeper_result: got result: %s' % r)
if __name__=='__main__':
pool=mp.Pool()
pool.apply_async(delegate,args=(sleeper, -3),kwds={'id':1},
callback=callback)
attach(1,on_sleeper_result)
pool.apply_async(delegate,args=(sleeper, 3),kwds={'id':2},
callback=callback)
attach(2,on_sleeper_result)
while 1:
logger.info('main: loop')
time.sleep(1)
你没有必要使用信号(低级接口)来进行Python的多进程处理,也不需要在主循环中忙着等待。
你需要在一个QThread
中运行你(修改过的)事件循环,这样它就可以直接调用Qt的代码,或者使用QApplication.postEvent(或者pyqtSignal)来在主线程中执行它。
# this should be in the delegate module
while 1:
c = _QUEUE.get(True) # no timeout
c.callback() # or post event to main thread
你还可以查看这个页面,了解在Qt中线程之间的通信讨论。