多次调用 thread.timer()
这段代码:
from threading import Timer
import time
def hello():
print "hello"
a=Timer(3,hello,())
a.start()
time.sleep(4)
a.start()
运行这个脚本后,我遇到了一个错误:RuntimeError: threads can only be started once
。我该如何解决这个错误呢?我想要多次启动计时器。
2 个回答
2
因为我每次烤饼干的时候都会启动烤箱的计时器,所以看到Python的计时器只能用一次让我有点惊讶。
不过,我想分享一个小的计时器类,它在start方法上提供了一些额外的选项:
- 可以返回自己,这样就可以用一行代码创建并启动计时器
- 有一个可选参数,可以选择是否在计时器还在运行时重新启动一个新的计时器
实现代码:
from threading import Timer, Lock
class TimerEx(object):
"""
A reusable thread safe timer implementation
"""
def __init__(self, interval_sec, function, *args, **kwargs):
"""
Create a timer object which can be restarted
:param interval_sec: The timer interval in seconds
:param function: The user function timer should call once elapsed
:param args: The user function arguments array (optional)
:param kwargs: The user function named arguments (optional)
"""
self._interval_sec = interval_sec
self._function = function
self._args = args
self._kwargs = kwargs
# Locking is needed since the '_timer' object might be replaced in a different thread
self._timer_lock = Lock()
self._timer = None
def start(self, restart_if_alive=True):
"""
Starts the timer and returns this object [e.g. my_timer = TimerEx(10, my_func).start()]
:param restart_if_alive: 'True' to start a new timer if current one is still alive
:return: This timer object (i.e. self)
"""
with self._timer_lock:
# Current timer still running
if self._timer is not None:
if not restart_if_alive:
# Keep the current timer
return self
# Cancel the current timer
self._timer.cancel()
# Create new timer
self._timer = Timer(self._interval_sec, self.__internal_call)
self._timer.start()
# Return this object to allow single line timer start
return self
def cancel(self):
"""
Cancels the current timer if alive
"""
with self._timer_lock:
if self._timer is not None:
self._timer.cancel()
self._timer = None
def is_alive(self):
"""
:return: True if current timer is alive (i.e not elapsed yet)
"""
with self._timer_lock:
if self._timer is not None:
return self._timer.is_alive()
return False
def __internal_call(self):
# Release timer object
with self._timer_lock:
self._timer = None
# Call the user defined function
self._function(*self._args, **self._kwargs)
这里有个例子:
from time import sleep
def my_func(msg):
print(msg)
my_timer = TimerEx(interval_sec=5, function=my_func, msg="Here is my message").start()
sleep(10)
my_timer.start()
sleep(10)
注意:我使用的是Python 3.7,所以我不确定这个在Python 2上是否能完全运行
6
threading.Timer
是从 threading.Thread
这个类继承来的。简单来说,线程对象是不能重复使用的。也就是说,每次你想要使用定时器的时候,都需要创建一个新的 Timer
实例。
from threading import Timer
import time
class RepeatableTimer(object):
def __init__(self, interval, function, args=[], kwargs={}):
self._interval = interval
self._function = function
self._args = args
self._kwargs = kwargs
def start(self):
t = Timer(self._interval, self._function, *self._args, **self._kwargs)
t.start()
def hello():
print "hello"
a=RepeatableTimer(3,hello,())
a.start()
time.sleep(4)
a.start()