改进线程Spinner类的键盘中断检测

2 投票
1 回答
1526 浏览
提问于 2025-04-16 13:04

好的,我写了这个类,参考了很多我在谷歌代码搜索中找到的其他Spinner类。

它现在能正常工作,但我想找个更好的方法来处理键盘中断和系统退出的异常。有没有更好的办法呢?

这是我的代码:

#!/usr/bin/env python
import itertools
import sys
import threading

class Spinner(threading.Thread):
    '''Represent a random work indicator, handled in a separate thread'''

    # Spinner glyphs
    glyphs = ('|', '/', '-', '\\', '|', '/', '-')
    # Output string format
    output_format = '%-78s%-2s'
    # Message to output while spin
    spin_message = ''
    # Message to output when done
    done_message = ''
    # Time between spins
    spin_delay = 0.1

    def __init__(self, *args, **kwargs):
        '''Spinner constructor'''
        threading.Thread.__init__(self, *args, **kwargs)
        self.daemon = True
        self.__started = False
        self.__stopped = False
        self.__glyphs = itertools.cycle(iter(self.glyphs))

    def __call__(self, func, *args, **kwargs):
        '''Convenient way to run a routine with a spinner'''
        self.init()
        skipped = False

        try:
            return func(*args, **kwargs)
        except (KeyboardInterrupt, SystemExit):
            skipped = True
        finally:
            self.stop(skipped)

    def init(self):
        '''Shows a spinner'''
        self.__started = True
        self.start()

    def run(self):
        '''Spins the spinner while do some task'''
        while not self.__stopped:
            self.spin()

    def spin(self):
        '''Spins the spinner'''
        if not self.__started:
            raise NotStarted('You must call init() first before using spin()')

        if sys.stdin.isatty():
            sys.stdout.write('\r')
            sys.stdout.write(self.output_format % (self.spin_message,
                                                   self.__glyphs.next()))
            sys.stdout.flush()
            time.sleep(self.spin_delay)

    def stop(self, skipped=None):
        '''Stops the spinner'''
        if not self.__started:
            raise NotStarted('You must call init() first before using stop()')

        self.__stopped = True
        self.__started = False

        if sys.stdin.isatty() and not skipped:
            sys.stdout.write('\b%s%s\n' % ('\b' * len(self.done_message),
                                           self.done_message))
            sys.stdout.flush()

class NotStarted(Exception):
    '''Spinner not started exception'''
    pass

if __name__ == '__main__':
    import time

    # Normal example
    spinner1 = Spinner()
    spinner1.spin_message = 'Scanning...'
    spinner1.done_message = 'DONE'
    spinner1.init()
    skipped = False

    try:
        time.sleep(5)
    except (KeyboardInterrupt, SystemExit):
        skipped = True
    finally:
        spinner1.stop(skipped)

    # Callable example
    spinner2 = Spinner()
    spinner2.spin_message = 'Scanning...'
    spinner2.done_message = 'DONE'
    spinner2(time.sleep, 5)

提前谢谢你。

1 个回答

2

你可能不需要太担心捕捉 SystemExit,因为它是由 sys.exit() 引发的。你可能想在程序退出前捕捉它,以便清理一些资源。

另一种捕捉 KeyboardInterrupt 的方法是注册一个信号处理器来捕捉 SIGINT。不过在你的例子中,使用 try..except 更合适,所以你走在正确的道路上。

这里有一些小建议:

  • 也许可以把 __call__ 方法改名为 start,这样更清楚你是在启动一个任务。
  • 你还可以通过在 start 方法中添加一个新线程来让 Spinner 类更具可重用性,而不是在构造函数中。
  • 还要考虑当用户按下 CTRL-C 时当前的旋转任务会发生什么——下一个任务能否启动,还是应用程序应该直接退出?
  • 你也可以把 spin_message 作为 start 的第一个参数,这样可以把它和即将运行的任务关联起来。

例如,下面是某人如何使用 Spinner 的示例:

dbproc = MyDatabaseProc()
spinner = Spinner()
spinner.done_message = 'OK'
try:
    spinner.start("Dropping the database", dbproc.drop, "mydb")
    spinner.start("Re-creating the database", dbproc.create, "mydb")
    spinner.start("Inserting data into tables", dbproc.populate)
    ...
except (KeyboardInterrupt, SystemExit):
    # stop the currently executing job
    spinner.stop()
    # do some cleanup if needed..
    dbproc.cleanup()

撰写回答