有没有优雅或Pythonic的方式在线程中中断time.sleep()调用?

6 投票
3 回答
6409 浏览
提问于 2025-04-18 12:23

下面的代码按我预期的方式工作,具体来说:

  • 有一个叫“Ernie”的线程,它从1数到8,每数一次就睡1秒
  • 还有一个多余的用户界面组件,叫“Bert”
  • 在正常情况下,程序会一直运行,直到线程结束和用户界面关闭
  • 如果按下Ctrl-C,程序会在正常完成之前优雅地停止

为了实现这个功能,我把1秒的睡眠时间分成了50毫秒的小块,每次检查一个标志。

有没有更符合Python风格的方法,让线程在某段时间内(比如1秒)睡眠,同时可以通过某个标志或信号来中断?

        try:
            for i in xrange(8):
                print "i=%d" % i
                for _ in xrange(20):
                    time.sleep(0.05)
                    if not self.running:
                        raise GracefulShutdown
        except GracefulShutdown:
            print "ernie exiting"        

我更想这样做,并在线程中引发一个GracefulShutdown异常:

        try:
            for i in xrange(8):
                print "i=%d" % i
                time.sleep(1)
                # somehow allow another thread to raise GracefulShutdown
                # during the sleep() call
        except GracefulShutdown:
            print "ernie exiting"        

完整程序:

from PySide import QtCore, QtGui
from PySide.QtGui import QApplication
import sys
import signal
import time

class GracefulShutdown(Exception):
    pass

class Ernie(QtCore.QThread):
    def __init__(self):
        super(Ernie, self).__init__()
        self.running = True
    def run(self):
        try:
            for i in xrange(8):
                print "i=%d" % i
                for _ in xrange(20):
                    time.sleep(0.05)
                    if not self.running:
                        raise GracefulShutdown
        except GracefulShutdown:
            print "ernie exiting"        
    def shutdown(self):
        print "ernie received request to shutdown"
        self.running = False

class Bert(object):
    def __init__(self, argv):
        self.app = QApplication(argv)
        self.app.quitOnLastWindowClosed = False
    def show(self):
        widg = QtGui.QWidget()
        widg.resize(250, 150)
        widg.setWindowTitle('Simple')
        widg.show()
        self.widg = widg
        return widg
    def shutdown(self):
        print "bert exiting"
        self.widg.close()
    def start(self):
        # return control to the Python interpreter briefly every 100 msec
        timer = QtCore.QTimer()
        timer.start(100)
        timer.timeout.connect(lambda: None) 
        return self.app.exec_()        

def handleInterrupts(*actors):
    def handler(sig, frame):
        print "caught interrupt"
        for actor in actors:
            actor.shutdown()
    signal.signal(signal.SIGINT, handler)

bert = Bert(sys.argv)
gratuitousWidget = bert.show()
ernie = Ernie()
ernie.start()

handleInterrupts(bert, ernie)

retval = bert.start()
print "bert finished"
while not ernie.wait(100):
    # return control to the Python interpreter briefly every 100 msec
    pass
print "ernie finished"
sys.exit(retval)

3 个回答

1

我的直觉是用os.kill来发送信号,但只有主线程能接收到信号,所以Ernie无法通过这种方式被打断。文档建议使用锁来解决这个问题。

我想到的办法是创建一个锁,只有在需要结束Ernie的时候才能访问。主线程在创建了Bert和Ernie之后,会创建一个锁文件并将其锁定。然后,Ernie不是睡一秒钟,而是会花整整一秒钟去尝试获取这个锁。一旦程序需要关闭,就可以释放这个锁,Ernie会立刻获取到这个锁;这就告诉Ernie是时候关闭了。

由于我们无法像想象中那样将信号和线程结合在一起,这里有另一篇文章讨论了线程中的锁超时问题:

如何在Python 2.7中实现带超时的锁

我不能告诉你这个解决方案有多“Pythonic”,因为我还在努力理解什么才算是“Pythonic”。一旦你开始引入线程,写出优雅的代码就变得越来越困难了。

3

通常情况下,SIGINT信号会打断time.sleep的调用,但Python只允许应用程序的主线程接收信号,所以在这里不能使用。我建议如果可能的话,避免使用time.sleep,而是使用QTimer

from PySide import QtCore, QtGui
from PySide.QtCore import QTimer
from PySide.QtGui import QApplication
import sys 
import signal
from functools import partial

class Ernie(QtCore.QThread):
    def __init__(self):
        super(Ernie, self).__init__()

    def do_print(self, cur_num, max_num):
        print "i=%d" % cur_num
        cur_num += 1
        if cur_num < max_num:
            func = partial(self.do_print, cur_num, max_num)
            QTimer.singleShot(1000, func)
        else:
            self.exit()

    def run(self):
        self.do_print(0, 8)
        self.exec_()  # QTimer requires the event loop for the thread be running.
        print "ernie exiting"    


class Bert(object):
    def __init__(self, argv):
        self.app = QApplication(argv)
        self.app.quitOnLastWindowClosed = False
    def show(self):
        widg = QtGui.QWidget()
        widg.resize(250, 150)
        widg.setWindowTitle('Simple')
        widg.show()
        self.widg = widg
        return widg
    def shutdown(self):
        print "bert exiting"
        self.widg.close()
    def start(self):
        # return control to the Python interpreter briefly every 100 msec
        timer = QtCore.QTimer()
        timer.start(100)
        timer.timeout.connect(lambda: None) 
        return self.app.exec_()            

def handleInterrupts(*actors):
    def handler(sig, frame):
        print "caught interrupt"
        for actor in actors:
            actor.shutdown()
    signal.signal(signal.SIGINT, handler)

bert = Bert(sys.argv)
gratuitousWidget = bert.show()
ernie = Ernie()
ernie.start()

handleInterrupts(bert)

retval = bert.start()
print "bert finished"
ernie.exit()
while not ernie.wait(100):
    # return control to the Python interpreter briefly every 100 msec
    pass
print "ernie finished"
sys.exit(retval)

与其让run()方法在一个循环中使用time.sleep,我们可以在线程内部启动一个事件循环,并使用QTimer在设定的时间间隔内进行打印。这样,我们可以随时调用bernie.exit()来关闭线程,这样bernie的事件循环就会立即关闭。

编辑:

这里有一种替代的方法来实现同样的想法,至少可以把一些复杂性隐藏起来,让原来的循环保持不变:

def coroutine(func):
    def wrapper(*args, **kwargs):
        def execute(gen):
            try:
                op = gen.next() # run func until a yield is hit
                # Determine when to resume execution of the coroutine.
                # If func didn't yield anything, schedule it to run again
                # immediately by setting timeout to 0.
                timeout = op or 0 
                func = partial(execute, gen)
                QTimer.singleShot(timeout, func) # This schedules execute to run until the next yield after `timeout` milliseconds.
            except StopIteration:
                return

        gen = func(*args, **kwargs) # Get a generator object for the decorated function.
        execute(gen) 
    return wrapper

def async_sleep(timeout):
    """ When yielded inside a coroutine, triggers a `timeout` length sleep. """
    return timeout

class Ernie(QtCore.QThread):
    def __init__(self):
        super(Ernie, self).__init__()
        self.cur_num = 0 
        self.max_num = 8 

    @coroutine
    def do_print(self):
        for i in range(8):
            print "i=%d" % i 
            yield async_sleep(1000) # This could also just be yield 1000
        self.exit()

    def run(self):
        self.do_print() # Schedules do_print to run once self.exec_() is run.
        self.exec_()
        print "ernie exiting"    

coroutine允许被装饰的函数在出现yield时将控制权交回给Qt事件循环,并在之后继续执行被装饰的方法。确实,这只是把复杂性从我原来的例子中转移出去,但它确实把这些复杂性隐藏在你在线程中想要做的实际工作之外。

它是如何工作的:

这个方法的灵感来自于一些异步库中的协程实现,比如Tornadoasyncio模块。虽然我没有尝试做得像它们那样强大,但思路是一样的。我们希望能够中断的方法被实现为生成器,并用一个装饰器装饰,这个装饰器知道如何调用生成器并接收响应,从而能够正确地暂停和恢复生成器。当调用do_print时,流程大致如下:

  1. do_print()run中被调用。这实际上会导致调用coroutine.wrapper
  2. wrapper调用真正的do_print,它返回一个生成器对象。然后将这个对象传递给execute
  3. execute在生成器对象上调用next。这会导致do_print运行直到遇到yield。此时,do_print的执行会被暂停。
  4. execute安排do_print恢复执行。它通过首先确定何时安排它来做到这一点,方法是使用上一次运行的do_printyield值,或者默认值为0(这会立即恢复执行)。它调用QTimer.singleShot来安排自己在timeout毫秒后再次运行,同时使用partial来传递生成器对象。
  5. 步骤3-4会重复,直到do_print停止产生yield,调用self.exit()并返回,此时会抛出StopIteration,而coroutine装饰器会简单地返回,而不是安排另一个execute调用。
8

我不太确定这样做在Python中算不算优雅,但它确实能工作。只需要用一个队列,然后用带有超时的阻塞获取。下面是一个例子:

import threading
import Queue
import time

q = Queue.Queue()


def workit():
    for i in range(10):
        try:
            q.get(timeout=1)
            print '%s: Was interrupted' % time.time()
            break
        except Queue.Empty:
            print '%s: One second passed' % time.time()


th = threading.Thread(target=workit)
th.start()

time.sleep(3.2)
q.put(None)

撰写回答