在Python中延迟函数执行
在JavaScript中,我习惯于可以调用一些函数,让它们在稍后的时间执行,就像这样:
function foo() {
alert('bar');
}
setTimeout(foo, 1000);
这样做不会阻止其他代码的执行。
我不知道在Python中怎么做到类似的事情。我可以使用sleep:
import time
def foo():
print('bar')
time.sleep(1)
foo()
但这样会阻止其他代码的执行。(实际上在我的情况下,阻止Python本身并不是问题,但我就无法对这个方法进行单元测试了。)
我知道线程是为了实现不同步的执行而设计的,但我在想是否有更简单的方法,像JavaScript中的setTimeout
或setInterval
那样的。
6 个回答
17
如果你想在一段时间后执行一个函数,或者想让一个函数每隔几秒重复执行,而不使用线程,你可以这样做:
Tkinter
#!/usr/bin/env python
from Tkinter import Tk
def foo():
print("timer went off!")
def countdown(n, bps, root):
if n == 0:
root.destroy() # exit mainloop
else:
print(n)
root.after(1000 / bps, countdown, n - 1, bps, root) # repeat the call
root = Tk()
root.withdraw() # don't show the GUI window
root.after(4000, foo) # call foo() in 4 seconds
root.after(0, countdown, 10, 2, root) # show that we are alive
root.mainloop()
print("done")
输出
10
9
8
7
6
5
4
3
timer went off!
2
1
done
Gtk
#!/usr/bin/env python
from gi.repository import GObject, Gtk
def foo():
print("timer went off!")
def countdown(n): # note: a closure could have been used here instead
if n[0] == 0:
Gtk.main_quit() # exit mainloop
else:
print(n[0])
n[0] -= 1
return True # repeat the call
GObject.timeout_add(4000, foo) # call foo() in 4 seconds
GObject.timeout_add(500, countdown, [10])
Gtk.main()
print("done")
输出
10
9
8
7
6
5
4
timer went off!
3
2
1
done
Twisted
#!/usr/bin/env python
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
def foo():
print("timer went off!")
def countdown(n):
if n[0] == 0:
reactor.stop() # exit mainloop
else:
print(n[0])
n[0] -= 1
reactor.callLater(4, foo) # call foo() in 4 seconds
LoopingCall(countdown, [10]).start(.5) # repeat the call in .5 seconds
reactor.run()
print("done")
输出
10
9
8
7
6
5
4
3
timer went off!
2
1
done
Asyncio
Python 3.4 引入了一个新的临时接口用于异步输入输出 -- asyncio
模块:
#!/usr/bin/env python3.4
import asyncio
def foo():
print("timer went off!")
def countdown(n):
if n[0] == 0:
loop.stop() # end loop.run_forever()
else:
print(n[0])
n[0] -= 1
def frange(start=0, stop=None, step=1):
while stop is None or start < stop:
yield start
start += step #NOTE: loss of precision over time
def call_every(loop, seconds, func, *args, now=True):
def repeat(now=True, times=frange(loop.time() + seconds, None, seconds)):
if now:
func(*args)
loop.call_at(next(times), repeat)
repeat(now=now)
loop = asyncio.get_event_loop()
loop.call_later(4, foo) # call foo() in 4 seconds
call_every(loop, 0.5, countdown, [10]) # repeat the call every .5 seconds
loop.run_forever()
loop.close()
print("done")
输出
10
9
8
7
6
5
4
3
timer went off!
2
1
done
注意:这些方法在接口和行为上有一些细微的差别。
8
你想要一个来自Timer
对象,这个对象属于threading
模块。
from threading import Timer
from time import sleep
def foo():
print "timer went off!"
t = Timer(4, foo)
t.start()
for i in range(11):
print i
sleep(.5)
如果你想要重复执行某个操作,这里有个简单的解决办法:与其使用Timer
,不如直接用Thread
,并传入一个像这样工作的函数:
def call_delay(delay, repetitions, func, *args, **kwargs):
for i in range(repetitions):
sleep(delay)
func(*args, *kwargs)
这样做不会造成无限循环,因为无限循环可能会导致线程无法结束,或者出现其他不好的情况,如果处理不当的话。一个更复杂的方法可能会使用基于Event
的方式,像这样。