如何从QThread和队列运行的函数中返回值
请解释一下我们如何通过队列管理的线程发送和接收数据……
首先,我创建一个'QThread'的子类,并定义它的run()
方法,这个方法会在调用QThread
的.start()
时启动:
class SimpleThread(QtCore.QThread):
def __init__(self, queue, parent=None):
QtCore.QThread.__init__(self, parent)
self.queue=queue
def run(self):
while True:
arg=self.queue.get()
self.fun(arg)
self.queue.task_done()
def fun(self, arg):
for i in range (3):
print 'fun: %s'%i
self.sleep(1)
return arg+1
然后我声明两个线程实例(这样只占用两个CPU核心),并将self.queue
实例作为参数传入。
self.queue=queue.Queue()
for i in range(2):
thread=SimpleThread(self.queue)
thread.start()
现在如果我理解得没错,thread.start()
并没有真正启动任何东西。真正的“启动”只有在我调用queue.put()
时才会发生:
for arg in [1,2,3]: self.queue.put(arg)
这一行才是真正的调用。除了创建和启动队列项,put()
还允许将任何任意值保存到每个队列项中。.put()
同时做了几件事:它创建、启动、通过队列移动处理,并允许将一个变量“放入”队列项中(这个变量稍后可以通过函数处理器内部使用队列项的.get()
方法来获取)。
但是我该如何从fun()
函数中返回值呢?一个“常规”的fun()
的return resultValue
并不起作用。而且我不能使用self.queue.put()
方法,因为这个方法除了存储数据外,还会“创建”一个新的队列项……
后续编辑:
这里有一段稍微调整过的代码(从另一篇帖子复制粘贴过来),展示了一种如何从完成的线程返回值的方法。我不确定这里使用的方法是否适用于QThread……如果我错了,请纠正我:
import os, sys
import threading
import Queue
def callMe(incomingFun, daemon=False):
def execute(_queue, *args, **kwargs):
result=incomingFun(*args, **kwargs)
_queue.put(result)
def wrap(*args, **kwargs):
_queue=Queue.Queue()
_thread=threading.Thread(target=execute, args=(_queue,)+args, kwargs=kwargs)
_thread.daemon=daemon
_thread.start()
_thread.result_queue=_queue
return _thread
return wrap
@callMe
def localFunc(x):
import time
x = x + 5
time.sleep(5)
return x
thread=localFunc(10)
# this blocks, waiting for the result
result = thread.result_queue.get()
print result
1 个回答
在正常情况下,你会使用一个结果队列来发送结果,然后让另一个线程在那儿等着结果:
class SimpleThread(QtCore.QThread):
def __init__(self, queue, result_queue, parent=None):
QtCore.QThread.__init__(self, parent)
self.queue=queue
self.result_queue = result_queue
def run(self):
while True:
arg=self.queue.get()
self.fun(arg)
self.queue.task_done()
def fun(self, arg):
for i in range (3):
print 'fun: %s'%i
self.sleep(1)
self.result_queue.put(arg+1)
def handle_results(result_queue):
while True:
result = result_queue.get()
print("Got result {}".format(result))
主线程:
self.queue=queue.Queue()
self.result_queue = queue.Queue()
result_handler = threading.Thread(target=handle_results, self.result_queue)
for i in range(2):
thread=SimpleThread(self.queue, self.result_queue)
thread.start()
这样做可以避免在等待结果时阻塞图形界面的事件循环。下面是使用 multiprocessing.pool.ThreadPool
的等效方式:
from multiprocessing.pool import ThreadPool
import time
def fun(arg):
for i in range (3):
print 'fun: %s'%i
time.sleep(1)
return arg+1
def handle_result(result):
print("got result {}".format(result))
pool = ThreadPool(2)
pool.map_async(fun, [1,2,3], callback=handle_result)
这样做简单多了。它内部会创建一个处理结果的线程,当 fun
完成时,会自动调用 handle_result
。
不过,你现在使用的是 QThread
,而且你希望结果能更新图形界面的控件,所以你其实想把结果发送回主线程,而不是一个处理结果的线程。在这种情况下,使用 Qt 的信号系统是有意义的,这样你就可以在收到结果时安全地更新图形界面:
from PyQt4 import QtCore, QtGui
import sys
import Queue as queue
class ResultObj(QtCore.QObject):
def __init__(self, val):
self.val = val
class SimpleThread(QtCore.QThread):
finished = QtCore.pyqtSignal(object)
def __init__(self, queue, callback, parent=None):
QtCore.QThread.__init__(self, parent)
self.queue = queue
self.finished.connect(callback)
def run(self):
while True:
arg = self.queue.get()
if arg is None: # None means exit
print("Shutting down")
return
self.fun(arg)
def fun(self, arg):
for i in range(3):
print 'fun: %s' % i
self.sleep(1)
self.finished.emit(ResultObj(arg+1))
class AppWindow(QtGui.QMainWindow):
def __init__(self):
super(AppWindow, self).__init__()
mainWidget = QtGui.QWidget()
self.setCentralWidget(mainWidget)
mainLayout = QtGui.QVBoxLayout()
mainWidget.setLayout(mainLayout)
button = QtGui.QPushButton('Process')
button.clicked.connect(self.process)
mainLayout.addWidget(button)
def handle_result(self, result):
val = result.val
print("got val {}".format(val))
# You can update the UI from here.
def process(self):
MAX_CORES=2
self.queue = queue.Queue()
self.threads = []
for i in range(MAX_CORES):
thread = SimpleThread(self.queue, self.handle_result)
self.threads.append(thread)
thread.start()
for arg in [1,2,3]:
self.queue.put(arg)
for _ in range(MAX_CORES): # Tell the workers to shut down
self.queue.put(None)
app = QtGui.QApplication([])
window = AppWindow()
window.show()
sys.exit(app.exec_())
当按钮被按下时的输出:
fun: 0
fun: 0
fun: 1
fun: 1
fun: 2
fun: 2
fun: 0
got val 2
got val 3
Shutting down
fun: 1
fun: 2
Shutting down
got val 4