多次调用 thread.timer()

6 投票
2 回答
5860 浏览
提问于 2025-04-17 22:30

这段代码:

from threading import Timer
import time

def hello():
    print "hello"

a=Timer(3,hello,())
a.start()
time.sleep(4)
a.start()

运行这个脚本后,我遇到了一个错误:RuntimeError: threads can only be started once。我该如何解决这个错误呢?我想要多次启动计时器。

2 个回答

2

因为我每次烤饼干的时候都会启动烤箱的计时器,所以看到Python的计时器只能用一次让我有点惊讶。
不过,我想分享一个小的计时器类,它在start方法上提供了一些额外的选项:

  • 可以返回自己,这样就可以用一行代码创建并启动计时器
  • 有一个可选参数,可以选择是否在计时器还在运行时重新启动一个新的计时器

实现代码:

from threading import Timer, Lock


class TimerEx(object):
    """
    A reusable thread safe timer implementation
    """

def __init__(self, interval_sec, function, *args, **kwargs):
    """
    Create a timer object which can be restarted

    :param interval_sec: The timer interval in seconds
    :param function: The user function timer should call once elapsed
    :param args: The user function arguments array (optional)
    :param kwargs: The user function named arguments (optional)
    """
    self._interval_sec = interval_sec
    self._function = function
    self._args = args
    self._kwargs = kwargs
    # Locking is needed since the '_timer' object might be replaced in a different thread
    self._timer_lock = Lock()
    self._timer = None

def start(self, restart_if_alive=True):
    """
    Starts the timer and returns this object [e.g. my_timer = TimerEx(10, my_func).start()]

    :param restart_if_alive: 'True' to start a new timer if current one is still alive
    :return: This timer object (i.e. self)
    """
    with self._timer_lock:
        # Current timer still running
        if self._timer is not None:
            if not restart_if_alive:
                # Keep the current timer
                return self
            # Cancel the current timer
            self._timer.cancel()
        # Create new timer
        self._timer = Timer(self._interval_sec, self.__internal_call)
        self._timer.start()
    # Return this object to allow single line timer start
    return self

def cancel(self):
    """
    Cancels the current timer if alive
    """
    with self._timer_lock:
        if self._timer is not None:
            self._timer.cancel()
            self._timer = None

def is_alive(self):
    """
    :return: True if current timer is alive (i.e not elapsed yet)
    """
    with self._timer_lock:
        if self._timer is not None:
            return self._timer.is_alive()
    return False

def __internal_call(self):
    # Release timer object
    with self._timer_lock:
        self._timer = None
    # Call the user defined function
    self._function(*self._args, **self._kwargs)

这里有个例子:

from time import sleep

def my_func(msg):
    print(msg)

my_timer = TimerEx(interval_sec=5, function=my_func, msg="Here is my message").start()
sleep(10)
my_timer.start()
sleep(10)

注意:我使用的是Python 3.7,所以我不确定这个在Python 2上是否能完全运行

6

threading.Timer 是从 threading.Thread 这个类继承来的。简单来说,线程对象是不能重复使用的。也就是说,每次你想要使用定时器的时候,都需要创建一个新的 Timer 实例。

from threading import Timer
import time

class RepeatableTimer(object):
    def __init__(self, interval, function, args=[], kwargs={}):
        self._interval = interval
        self._function = function
        self._args = args
        self._kwargs = kwargs
    def start(self):
        t = Timer(self._interval, self._function, *self._args, **self._kwargs)
        t.start()

def hello():
    print "hello"

a=RepeatableTimer(3,hello,())
a.start()
time.sleep(4)
a.start()

撰写回答