在Python后台线程完成侧任务之前阻塞主线程

3 投票
4 回答
6363 浏览
提问于 2025-04-16 20:59

我有一个使用线程的Python应用程序,后台线程里有一个长时间运行的主循环。这个后台主循环实际上是调用了 pyglet.app.run(),它负责驱动一个图形界面窗口,并且可以设置定期调用其他代码。我需要一个 do_stuff(duration) 函数,可以从主线程随时调用,以触发图形界面的动画,等待动画结束后再返回。实际的动画必须在后台线程中完成,因为这个图形库不能处理由不同线程驱动的情况。

我认为我需要做类似这样的事情:

import threading

class StuffDoer(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.max_n_times = 0
        self.total_n_times = 0
        self.paused_ev = threading.Event()

    def run(self):
        # this part is outside of my control
        while True:
            self._do_stuff()
            # do other stuff

    def _do_stuff(self):
        # this part is under my control
        if self.paused_ev.is_set():
            if self.max_n_times > self.total_n_times:
                self.paused_ev.clear()
        else:
            if self.total_n_times >= self.max_n_times:
                self.paused_ev.set()
        if not self.paused_ev.is_set():
            # do stuff that must execute in the background thread
            self.total_n_times += 1

sd = StuffDoer()
sd.start()

def do_stuff(n_times):
    sd.max_n_times += n_times
    sd.paused_ev.wait_for_clear()   # wait_for_clear() does not exist
    sd.paused_ev.wait()
    assert (sd.total_n_times == sd.max_n_times)

编辑:使用 max_n_times 代替 stop_time,以澄清为什么 Thread.join(duration) 不行。

根据 threading.Event 的文档:

wait([timeout])

会阻塞,直到内部标志变为真。如果进入时内部标志已经为真,立即返回。否则,会阻塞,直到另一个线程调用 set() 将标志设置为真,或者直到可选的超时发生。

我发现如果我有一对事件 paused_evnot_paused_ev,并使用 not_paused_ev.wait(),就能实现我想要的效果。我几乎可以直接使用 Thread.join(duration),但它需要在后台线程真正确认时间到的时候才返回。请问有没有其他的同步对象或策略我应该使用呢?

如果有人认为我整个思路有问题,我也乐意听取好的意见。

4 个回答

1

看起来你的图形界面动画线程在它的 while True 循环中使用了自旋锁。这种情况可以通过使用线程安全的队列来避免。根据我对你问题的理解,这种方法在功能上是等效的,而且效率也不错。

我省略了一些你代码中的细节,这些细节不会改变。我还假设你无法控制的 run() 方法使用了 self.stop_time 值来完成它的工作;否则就不需要线程安全的队列了。

from Queue import Queue
from threading import Event

class StuffDoer:
  def __init__(self, inq, ready):
    self.inq = inq
    self.ready = ready
  def _do_stuff(self):
    self.ready.set()
    self.stop_time = self.inq.get()

GUIqueue = Queue()
control = Event()

sd = StuffDoer(GUIqueue, control)

def do_stuff(duration):
  control.clear()
  GUIqueue.put(time.time() + duration)
  control.wait()
3

希望我的评论能得到一些修改或额外的信息,但我有点想知道你是不是在过度使用线程的子类化。你可以这样做:

class MyWorker(object):
  def __init__(self):
    t = Thread(target = self._do_work, name "Worker Owned Thread")

    t.daemon = True

    t.start()

  def _do_work(self):
    While True:
      # Something going on here, forever if necessary.  This thread
      # will go away if the other non-daemon threads terminate, possibly
      # raising an exception depending this function's body.

我觉得这样做更合理,特别是当你想运行的方法更像是其他类的一个成员函数,而不是线程的运行方法。此外,这样做可以避免把很多业务逻辑封装在一个线程里面。当然,这只是我的个人看法。

1

我最后使用了一个队列,跟@wberry建议的差不多,并且用了Queue.task_doneQueue.wait这两个功能:

import Queue
import threading

class StuffDoer(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.max_n_times = 0
        self.total_n_times = 0
        self.do_queue = Queue.Queue()

    def run(self):
        # this part is outside of my control
        while True:
            self._do_stuff()
            # do other stuff

    def _do_stuff(self):
        # this part is under my control
        if self.total_n_times >= self.max_n_times:
            try:
                self.max_n_times += self.do_queue.get(block=False)
            except Queue.Empty, e:
                pass
        if self.max_n_times > self.total_n_times:
            # do stuff that must execute in the background thread
            self.total_n_times += 1
            if self.total_n_times >= self.max_n_times:
                self.do_queue.task_done()

sd = StuffDoer()
sd.start()

def do_stuff(n_times):
    sd.do_queue.put(n_times)
    sd.do_queue.join()
    assert (sd.total_n_times == sd.max_n_times)

撰写回答