在Python后台线程完成侧任务之前阻塞主线程
我有一个使用线程的Python应用程序,后台线程里有一个长时间运行的主循环。这个后台主循环实际上是调用了 pyglet.app.run(),它负责驱动一个图形界面窗口,并且可以设置定期调用其他代码。我需要一个 do_stuff(duration)
函数,可以从主线程随时调用,以触发图形界面的动画,等待动画结束后再返回。实际的动画必须在后台线程中完成,因为这个图形库不能处理由不同线程驱动的情况。
我认为我需要做类似这样的事情:
import threading
class StuffDoer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.max_n_times = 0
self.total_n_times = 0
self.paused_ev = threading.Event()
def run(self):
# this part is outside of my control
while True:
self._do_stuff()
# do other stuff
def _do_stuff(self):
# this part is under my control
if self.paused_ev.is_set():
if self.max_n_times > self.total_n_times:
self.paused_ev.clear()
else:
if self.total_n_times >= self.max_n_times:
self.paused_ev.set()
if not self.paused_ev.is_set():
# do stuff that must execute in the background thread
self.total_n_times += 1
sd = StuffDoer()
sd.start()
def do_stuff(n_times):
sd.max_n_times += n_times
sd.paused_ev.wait_for_clear() # wait_for_clear() does not exist
sd.paused_ev.wait()
assert (sd.total_n_times == sd.max_n_times)
编辑:使用 max_n_times
代替 stop_time
,以澄清为什么 Thread.join(duration)
不行。
根据 threading.Event 的文档:
wait([timeout])
会阻塞,直到内部标志变为真。如果进入时内部标志已经为真,立即返回。否则,会阻塞,直到另一个线程调用 set() 将标志设置为真,或者直到可选的超时发生。
我发现如果我有一对事件 paused_ev
和 not_paused_ev
,并使用 not_paused_ev.wait()
,就能实现我想要的效果。我几乎可以直接使用 Thread.join(duration)
,但它需要在后台线程真正确认时间到的时候才返回。请问有没有其他的同步对象或策略我应该使用呢?
如果有人认为我整个思路有问题,我也乐意听取好的意见。
4 个回答
看起来你的图形界面动画线程在它的 while True
循环中使用了自旋锁。这种情况可以通过使用线程安全的队列来避免。根据我对你问题的理解,这种方法在功能上是等效的,而且效率也不错。
我省略了一些你代码中的细节,这些细节不会改变。我还假设你无法控制的 run() 方法使用了 self.stop_time 值来完成它的工作;否则就不需要线程安全的队列了。
from Queue import Queue
from threading import Event
class StuffDoer:
def __init__(self, inq, ready):
self.inq = inq
self.ready = ready
def _do_stuff(self):
self.ready.set()
self.stop_time = self.inq.get()
GUIqueue = Queue()
control = Event()
sd = StuffDoer(GUIqueue, control)
def do_stuff(duration):
control.clear()
GUIqueue.put(time.time() + duration)
control.wait()
希望我的评论能得到一些修改或额外的信息,但我有点想知道你是不是在过度使用线程的子类化。你可以这样做:
class MyWorker(object):
def __init__(self):
t = Thread(target = self._do_work, name "Worker Owned Thread")
t.daemon = True
t.start()
def _do_work(self):
While True:
# Something going on here, forever if necessary. This thread
# will go away if the other non-daemon threads terminate, possibly
# raising an exception depending this function's body.
我觉得这样做更合理,特别是当你想运行的方法更像是其他类的一个成员函数,而不是线程的运行方法。此外,这样做可以避免把很多业务逻辑封装在一个线程里面。当然,这只是我的个人看法。
我最后使用了一个队列,跟@wberry建议的差不多,并且用了Queue.task_done
和Queue.wait
这两个功能:
import Queue
import threading
class StuffDoer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.setDaemon(True)
self.max_n_times = 0
self.total_n_times = 0
self.do_queue = Queue.Queue()
def run(self):
# this part is outside of my control
while True:
self._do_stuff()
# do other stuff
def _do_stuff(self):
# this part is under my control
if self.total_n_times >= self.max_n_times:
try:
self.max_n_times += self.do_queue.get(block=False)
except Queue.Empty, e:
pass
if self.max_n_times > self.total_n_times:
# do stuff that must execute in the background thread
self.total_n_times += 1
if self.total_n_times >= self.max_n_times:
self.do_queue.task_done()
sd = StuffDoer()
sd.start()
def do_stuff(n_times):
sd.do_queue.put(n_times)
sd.do_queue.join()
assert (sd.total_n_times == sd.max_n_times)