PyQt中的异步模式?或者更简洁的后台调用模式?
我正在尝试写一个简单的(一个文件的pyqt)程序,希望它能响应用户操作(也就是说,我不想依赖那些需要额外安装的库,尤其是那些我不能直接放进文件里的库,因为这样会带来一些麻烦,不过我还是可能会尝试一下)。
我想在一个工作线程上执行可能需要较长时间的(并且可以取消的)操作(实际上,后台操作有一个锁,以防止同时进行多个操作,因为它使用的库一次只能调用一个,而且有超时设置,所以启动多个线程也是可以的)。
根据我了解的,使用qt的“基本”方式是这样的。
(注意,代码没有经过测试,所以可能有错误)
class MainWindow(QWidget):
#self.worker moved to background thread
def initUI(self):
...
self.cmd_button.clicked.connect(self.send)
...
@pyqtslot()
def send(self):
...
...#get cmd from gui
QtCore.QTimer.singleShot(0, lambda : self.worker(cmd))
@pyqtslot(str)
def end_send(self, result):
...
...# set some gui to display result
...
class WorkerObject(QObject):
def send_cmd(self, cmd):
... get result of cmd
QtCore.QTimer.singleShot(0, lambda: self.main_window.end_send())
(我使用QTimer的方式对吗?它是在不同的线程上运行吗?)
我其实更希望有一些更简单、更抽象的方式,像c#的async那样。
(注意,我没有使用过asyncio,所以可能有些地方理解错了)
class MainWindow(QWidget):
...
@asyncio.coroutine
def send(self):
...
...#get cmd from gui
result = yield from self.worker(cmd)
#set gui textbox to result
class WorkerObject(QObject):
@asyncio.coroutine
def send_cmd(self, cmd):
... get result of cmd
yield from loop.run_in_executor(None, self.model.send_command, cmd)
我听说python 3有类似的功能,还有一个回归版本,但它和qt一起使用时能正常工作吗?
如果有人知道其他更合理的模式,那也会是有用的,或者是可以接受的答案。
3 个回答
要运行一个比较复杂的事件驱动工作对象,依赖于QThreads可能会太复杂了。这里有两种方法可以让你一次性调用某个处理方法:
第一种方法是用QtConcurrent()
。如果你只是想运行一个耗时的函数,这个方法会比较合适。不过,不确定在pyqt中是否能用这个。
第二种方法是创建一个QThread的子类,并在这个子类的run()
方法中实现你的处理代码。然后只需要调用QThreadSubclass.start()
就可以了。这种方法在PyQt中应该是可用的,可能是更好的选择。这样复杂度就降低到一个简单的子类了。与线程之间的通信也很简单,就像和其他类沟通一样。
如果你使用一个分配给QThread的对象,这可能不是最好的方法,那么你应该使用Qt.QueuedConnection来发出信号,而不是用QTimer。使用QueuedConnection可以确保槽函数在对象所在的线程中运行。
如果你想要一个非常简单的方法来实现这个功能,可以创建一个QThread(线程),然后用一个pyqtSignal
来通知父级线程,当这个线程完成时。这里有两个按钮。一个按钮控制一个可以取消的后台线程。第一次点击按钮会启动这个线程,第二次点击则会取消这个后台线程。另一个按钮在后台线程运行时会自动禁用,等线程完成后再重新启用。
from PyQt4 import QtGui
from PyQt4 import QtCore
class MainWindow(QtGui.QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self.initUI()
self.task = None
def initUI(self):
self.cmd_button = QtGui.QPushButton("Push/Cancel", self)
self.cmd_button2 = QtGui.QPushButton("Push", self)
self.cmd_button.clicked.connect(self.send_cancellable_evt)
self.cmd_button2.clicked.connect(self.send_evt)
self.statusBar()
self.layout = QtGui.QGridLayout()
self.layout.addWidget(self.cmd_button, 0, 0)
self.layout.addWidget(self.cmd_button2, 0, 1)
widget = QtGui.QWidget()
widget.setLayout(self.layout)
self.setCentralWidget(widget)
self.show()
def send_evt(self, arg):
self.t1 = RunThread(self.worker, self.on_send_finished, "test")
self.t2 = RunThread(self.worker, self.on_send_finished, 55)
print("kicked off async tasks, waiting for it to be done")
def worker(self, inval):
print "in worker, received '%s'" % inval
time.sleep(2)
return inval
def send_cancellable_evt(self, arg):
if not self.task:
self.task = RunCancellableThread(None, self.on_csend_finished, "test")
print("kicked off async task, waiting for it to be done")
else:
self.task.cancel()
print("Cancelled async task.")
def on_csend_finished(self, result):
self.task = None # Allow the worker to be restarted.
print "got %s" % result
def on_send_finished(self, result):
print "got %s. Type is %s" % (result, type(result))
class RunThread(QtCore.QThread):
""" Runs a function in a thread, and alerts the parent when done.
Uses a pyqtSignal to alert the main thread of completion.
"""
finished = QtCore.pyqtSignal(["QString"], [int])
def __init__(self, func, on_finish, *args, **kwargs):
super(RunThread, self).__init__()
self.args = args
self.kwargs = kwargs
self.func = func
self.finished.connect(on_finish)
self.finished[int].connect(on_finish)
self.start()
def run(self):
try:
result = self.func(*self.args, **self.kwargs)
except Exception as e:
print "e is %s" % e
result = e
finally:
if isinstance(result, int):
self.finished[int].emit(result)
else:
self.finished.emit(str(result)) # Force it to be a string by default.
class RunCancellableThread(RunThread):
def __init__(self, *args, **kwargs):
self.cancelled = False
super(RunCancellableThread, self).__init__(*args, **kwargs)
def cancel(self):
self.cancelled = True # Use this if you just want to signal your run() function.
# Use this to ungracefully stop the thread. This isn't recommended,
# especially if you're doing any kind of work in the thread that could
# leave things in an inconsistent or corrupted state if suddenly
# terminated
#self.terminate()
def run(self):
try:
start = cur_time = time.time()
while cur_time - start < 10:
if self.cancelled:
print("cancelled")
result = "cancelled"
break
print "doing work in worker..."
time.sleep(1)
cur_time = time.time()
except Exception as e:
print "e is %s" % e
result = e
finally:
if isinstance(result, int):
self.finished[int].emit(result)
else:
self.finished.emit(str(result)) # Force it to be a string by default.
if __name__ == "__main__":
app = QtGui.QApplication(sys.argv)
m = MainWindow()
sys.exit(app.exec_())
点击“推送”后的输出:
in worker, received 'test'kicked off async tasks, waiting for it to be done
in worker, received '55'
got 55. Type is <type 'int'>
got test. Type is <class 'PyQt4.QtCore.QString'>
in worker, received 'test'
in worker, received '55'
点击“推送/取消”后的输出:
kicked off async task, waiting for it to be done
doing work in worker...
doing work in worker...
doing work in worker...
doing work in worker...
doing work in worker...
doing work in worker...
<I pushed the button again here>
Cancelled async task.
cancelled
got cancelled
这里有几个烦人的限制:
finished
信号在处理不同类型的数据时不太方便。你需要明确声明并连接每种你想返回的数据类型的处理函数,然后确保在得到结果时发出信号给正确的处理函数。这种情况被称为信号的重载。有些Python类型在C++中有相同的签名,在一起使用时可能会出现问题,比如pyqtSignal([dict], [list])
。为了处理你在线程中运行的不同类型的数据,可能更简单的方法是创建几个不同的QThread
子类。- 你需要保存你创建的
RunThread
对象的引用,否则当它超出作用域时会立即被销毁,这样会在它完成之前终止线程中的工作。这有点浪费,因为在你完成后,你还要保留对已完成的QThread
对象的引用(除非你在on_finish
处理函数中清理,或者通过其他方式处理)。
你问的这个问题(“在PyQt中有没有办法使用类似asyncio
的模式?”)的简短回答是有,但其实挺复杂的,对于小程序来说可能不太值得。下面是一些原型代码,可以让你使用你描述的异步模式:
import types
import weakref
from functools import partial
from PyQt4 import QtGui
from PyQt4 import QtCore
from PyQt4.QtCore import QThread, QTimer
## The following code is borrowed from here:
# http://stackoverflow.com/questions/24689800/async-like-pattern-in-pyqt-or-cleaner-background-call-pattern
# It provides a child->parent thread-communication mechanism.
class ref(object):
"""
A weak method implementation
"""
def __init__(self, method):
try:
if method.im_self is not None:
# bound method
self._obj = weakref.ref(method.im_self)
else:
# unbound method
self._obj = None
self._func = method.im_func
self._class = method.im_class
except AttributeError:
# not a method
self._obj = None
self._func = method
self._class = None
def __call__(self):
"""
Return a new bound-method like the original, or the
original function if refers just to a function or unbound
method.
Returns None if the original object doesn't exist
"""
if self.is_dead():
return None
if self._obj is not None:
# we have an instance: return a bound method
return types.MethodType(self._func, self._obj(), self._class)
else:
# we don't have an instance: return just the function
return self._func
def is_dead(self):
"""
Returns True if the referenced callable was a bound method and
the instance no longer exists. Otherwise, return False.
"""
return self._obj is not None and self._obj() is None
def __eq__(self, other):
try:
return type(self) is type(other) and self() == other()
except:
return False
def __ne__(self, other):
return not self == other
class proxy(ref):
"""
Exactly like ref, but calling it will cause the referent method to
be called with the same arguments. If the referent's object no longer lives,
ReferenceError is raised.
If quiet is True, then a ReferenceError is not raise and the callback
silently fails if it is no longer valid.
"""
def __init__(self, method, quiet=False):
super(proxy, self).__init__(method)
self._quiet = quiet
def __call__(self, *args, **kwargs):
func = ref.__call__(self)
if func is None:
if self._quiet:
return
else:
raise ReferenceError('object is dead')
else:
return func(*args, **kwargs)
def __eq__(self, other):
try:
func1 = ref.__call__(self)
func2 = ref.__call__(other)
return type(self) == type(other) and func1 == func2
except:
return False
class CallbackEvent(QtCore.QEvent):
"""
A custom QEvent that contains a callback reference
Also provides class methods for conveniently executing
arbitrary callback, to be dispatched to the event loop.
"""
EVENT_TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
def __init__(self, func, *args, **kwargs):
super(CallbackEvent, self).__init__(self.EVENT_TYPE)
self.func = func
self.args = args
self.kwargs = kwargs
def callback(self):
"""
Convenience method to run the callable.
Equivalent to:
self.func(*self.args, **self.kwargs)
"""
self.func(*self.args, **self.kwargs)
@classmethod
def post_to(cls, receiver, func, *args, **kwargs):
"""
Post a callable to be delivered to a specific
receiver as a CallbackEvent.
It is the responsibility of this receiver to
handle the event and choose to call the callback.
"""
# We can create a weak proxy reference to the
# callback so that if the object associated with
# a bound method is deleted, it won't call a dead method
if not isinstance(func, proxy):
reference = proxy(func, quiet=True)
else:
reference = func
event = cls(reference, *args, **kwargs)
# post the event to the given receiver
QtGui.QApplication.postEvent(receiver, event)
## End borrowed code
## Begin Coroutine-framework code
class AsyncTask(QtCore.QObject):
""" Object used to manage asynchronous tasks.
This object should wrap any function that you want
to call asynchronously. It will launch the function
in a new thread, and register a listener so that
`on_finished` is called when the thread is complete.
"""
def __init__(self, func, *args, **kwargs):
super(AsyncTask, self).__init__()
self.result = None # Used for the result of the thread.
self.func = func
self.args = args
self.kwargs = kwargs
self.finished = False
self.finished_cb_ran = False
self.finished_callback = None
self.objThread = RunThreadCallback(self, self.func, self.on_finished,
*self.args, **self.kwargs)
self.objThread.start()
def customEvent(self, event):
event.callback()
def on_finished(self, result):
""" Called when the threaded operation is complete.
Saves the result of the thread, and
executes finished_callback with the result if one
exists. Also closes/cleans up the thread.
"""
self.finished = True
self.result = result
if self.finished_callback:
self.finished_ran = True
func = partial(self.finished_callback, result)
QTimer.singleShot(0, func)
self.objThread.quit()
self.objThread.wait()
class RunThreadCallback(QtCore.QThread):
""" Runs a function in a thread, and alerts the parent when done.
Uses a custom QEvent to alert the main thread of completion.
"""
def __init__(self, parent, func, on_finish, *args, **kwargs):
super(RunThreadCallback, self).__init__(parent)
self.on_finished = on_finish
self.func = func
self.args = args
self.kwargs = kwargs
def run(self):
try:
result = self.func(*self.args, **self.kwargs)
except Exception as e:
print "e is %s" % e
result = e
finally:
CallbackEvent.post_to(self.parent(), self.on_finished, result)
def coroutine(func):
""" Coroutine decorator, meant for use with AsyncTask.
This decorator must be used on any function that uses
the `yield AsyncTask(...)` pattern. It shouldn't be used
in any other case.
The decorator will yield AsyncTask objects from the
decorated generator function, and register itself to
be called when the task is complete. It will also
excplicitly call itself if the task is already
complete when it yields it.
"""
def wrapper(*args, **kwargs):
def execute(gen, input=None):
if isinstance(gen, types.GeneratorType):
if not input:
obj = next(gen)
else:
try:
obj = gen.send(input)
except StopIteration as e:
result = getattr(e, "value", None)
return result
if isinstance(obj, AsyncTask):
# Tell the thread to call `execute` when its done
# using the current generator object.
func = partial(execute, gen)
obj.finished_callback = func
if obj.finished and not obj.finished_cb_ran:
obj.on_finished(obj.result)
else:
raise Exception("Using yield is only supported with AsyncTasks.")
else:
print("result is %s" % result)
return result
result = func(*args, **kwargs)
execute(result)
return wrapper
## End coroutine-framework code
如果你把上面的代码放到一个模块里(比如叫qtasync.py
),你就可以在脚本中导入它,然后像这样使用,以获得类似asyncio
的行为:
import sys
import time
from qtasync import AsyncTask, coroutine
from PyQt4 import QtGui
from PyQt4.QtCore import QThread
class MainWindow(QtGui.QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self.initUI()
def initUI(self):
self.cmd_button = QtGui.QPushButton("Push", self)
self.cmd_button.clicked.connect(self.send_evt)
self.statusBar()
self.show()
def worker(self, inval):
print "in worker, received '%s'" % inval
time.sleep(2)
return "%s worked" % inval
@coroutine
def send_evt(self, arg):
out = AsyncTask(self.worker, "test string")
out2 = AsyncTask(self.worker, "another test string")
QThread.sleep(3)
print("kicked off async task, waiting for it to be done")
val = yield out
val2 = yield out2
print ("out is %s" % val)
print ("out2 is %s" % val2)
out = yield AsyncTask(self.worker, "Some other string")
print ("out is %s" % out)
if __name__ == "__main__":
app = QtGui.QApplication(sys.argv)
m = MainWindow()
sys.exit(app.exec_())
输出(当按钮被点击时):
in worker, received 'test string'
in worker, received 'another test string'
kicked off async task, waiting for it to be done
out is test string worked
out2 is another test string worked
in worker, received 'Some other string'
out is Some other string worked
如你所见,worker
会在每次通过AsyncTask
类调用时异步运行,但它的返回值可以直接从send_evt
中yield
出来,而不需要使用回调。
这段代码利用了Python生成器的协程支持特性(generator_object.send
),以及我在ActiveState上找到的一个示例,提供了子线程与主线程之间的通信机制,从而实现了一些非常基本的协程。这些协程功能有限:你不能从它们中返回任何东西,也不能将协程调用串联起来。虽然可能可以实现这两点,但如果你不真的需要,可能也不值得去做。我也没有对这个进行太多负面测试,所以在工作线程和其他地方的异常可能处理得不好。不过,它确实能很好地让你通过AsyncTask
类在不同线程中调用方法,并在结果准备好时从线程中yield
一个结果,而不会阻塞Qt事件循环。通常这种事情会用回调来完成,但回调可能会让人难以跟踪,而且通常比把所有代码放在一个函数中更难读。
如果你能接受我提到的这些限制,可以使用这种方法,但这其实只是一个概念验证;在考虑将其投入生产之前,你需要进行大量测试。
正如你提到的,Python 3.3和3.4通过引入yield from
和asyncio
使异步编程变得更简单。我认为yield from
在这里会非常有用,可以实现协程的链式调用(也就是说一个协程可以调用另一个并获取结果)。不过,asyncio
没有与PyQt4的事件循环集成,所以它的实用性相对有限。
另一个选择是完全放弃协程的部分,直接使用基于回调的线程间通信机制:
import sys
import time
from qtasync import CallbackEvent # No need for the coroutine stuff
from PyQt4 import QtGui
from PyQt4.QtCore import QThread
class MyThread(QThread):
""" Runs a function in a thread, and alerts the parent when done.
Uses a custom QEvent to alert the main thread of completion.
"""
def __init__(self, parent, func, on_finish, *args, **kwargs):
super(MyThread, self).__init__(parent)
self.on_finished = on_finish
self.func = func
self.args = args
self.kwargs = kwargs
self.start()
def run(self):
try:
result = self.func(*self.args, **self.kwargs)
except Exception as e:
print "e is %s" % e
result = e
finally:
CallbackEvent.post_to(self.parent(), self.on_finished, result)
class MainWindow(QtGui.QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self.initUI()
def initUI(self):
self.cmd_button = QtGui.QPushButton("Push", self)
self.cmd_button.clicked.connect(self.send)
self.statusBar()
self.show()
def customEvent(self, event):
event.callback()
def worker(self, inval):
print("in worker, received '%s'" % inval)
time.sleep(2)
return "%s worked" % inval
def end_send(self, cmd):
print("send returned '%s'" % cmd)
def send(self, arg):
t = MyThread(self, self.worker, self.end_send, "some val")
print("Kicked off thread")
if __name__ == "__main__":
app = QtGui.QApplication(sys.argv)
m = MainWindow()
sys.exit(app.exec_())
输出:
Kicked off thread
in worker, received 'some val'
send returned 'some val worked'
如果你处理的是一个很长的回调链,这可能会变得有点麻烦,但它不依赖于那些尚未经过充分验证的coroutine
代码。