如何在Python中优雅地终止按下CTRL+C时的循环
我刚接触Python,遇到了一个问题。
我有一个脚本,它会逐个处理文件,并根据输入文件的名字把结果写到不同的文件里。有时候我需要中断这个脚本,但我希望它能先完成当前文件的处理,然后再终止(这样可以避免生成信息不完整的结果文件)。我该怎么在Python中实现这个功能呢?
这是我尝试过的方法。
a) 使用try-except块
x = 1
print "Script started."
while True:
try:
print "Processing file #",x,"started...",
# do something time-cosnuming
time.sleep(1)
x += 1
print " finished."
except KeyboardInterrupt:
print "Bye"
print "x=",x
sys.exit()
sys.exit()
输出:
Script started.
Processing file # 1 started... finished.
Processing file # 2 started... finished.
Processing file # 3 started... Bye
x= 3
第3次迭代没有正常结束。
b) 使用sys.excepthook
OriginalExceptHook = sys.excepthook
def NewExceptHook(type, value, traceback):
global Terminator
Terminator = True
if type == KeyboardInterrupt:
#exit("\nExiting by CTRL+C.") # this line was here originally
print("\n\nExiting by CTRL+C.\n\n")
else:
OriginalExceptHook(type, value, traceback)
sys.excepthook = NewExceptHook
global Terminator
Terminator = False
x = 1
while True:
print "Processing file #",x,"started...",
# do something time-cosnuming
time.sleep(1)
x += 1
print " finished."
if Terminator:
print "I'll be back!"
break
print "Bye"
print "x=",x
sys.exit()
输出:
Script started.
Processing file # 1 started... finished.
Processing file # 2 started... finished.
Processing file # 3 started...
Exiting by CTRL+C.
第3次迭代没有正常结束。
更新#1
@mguijarr,我稍微修改了代码,如下:
import time, sys
x = 1
print "Script started."
stored_exception=None
while True:
try:
print "Processing file #",x,"started...",
# do something time-cosnuming
time.sleep(1)
print "Processing file #",x,"part two...",
time.sleep(1)
print " finished."
if stored_exception:
break
x += 1
except KeyboardInterrupt:
print "[CTRL+C detected]",
stored_exception=sys.exc_info()
print "Bye"
print "x=",x
if stored_exception:
raise stored_exception[0], stored_exception[1], stored_exception[2]
sys.exit()
输出是(在Win7-64位上使用“Python 2.7.6 :: Anaconda 2.0.0 (64-bit)”测试):
Script started.
Processing file # 1 started... Processing file # 1 part two... finished.
Processing file # 2 started... Processing file # 2 part two... finished.
Processing file # 3 started... [CTRL+C detected] Processing file # 3 started... Processing file # 3 part two... finished.
Bye
x= 3
Traceback (most recent call last):
File "test2.py", line 12, in <module>
time.sleep(1)
KeyboardInterrupt
在这种情况下,第3次迭代实际上被重新启动了,这看起来很奇怪,并不是我想要的结果。有没有办法避免这种情况?
我在'print'语句中去掉了逗号,并添加了更多内容,以便确认迭代确实被重新启动:
import time, sys
x = 1
y = 0
print "Script started."
stored_exception=None
while True:
try:
y=x*1000
y+=1
print "Processing file #",x,y,"started..."
y+=1
# do something time-cosnuming
y+=1
time.sleep(1)
y+=1
print "Processing file #",x,y,"part two..."
y+=1
time.sleep(1)
y+=1
print " finished.",x,y
y+=1
if stored_exception:
break
y+=1
x += 1
y+=1
except KeyboardInterrupt:
print "[CTRL+C detected]",
stored_exception=sys.exc_info()
print "Bye"
print "x=",x
print "y=",y
if stored_exception:
raise stored_exception[0], stored_exception[1], stored_exception[2]
sys.exit()
输出是:
Script started.
Processing file # 1 1001 started...
Processing file # 1 1004 part two...
finished. 1 1006
Processing file # 2 2001 started...
Processing file # 2 2004 part two...
[CTRL+C detected] Processing file # 2 2001 started...
Processing file # 2 2004 part two...
finished. 2 2006
Bye
x= 2
y= 2007
Traceback (most recent call last):
File "test2.py", line 20, in <module>
time.sleep(1)
KeyboardInterrupt
4 个回答
0
虽然我来得有点晚,但这是我的解决办法:
#!/usr/bin/env python
# -*- coding: utf8 -*-
import sys
import signal
import time
interrupted = False
def main():
global interrupted
oldHandler = signal.signal(signal.SIGINT, handle_interrupt)
x = 1
print "Script started"
while True:
print "Processing file #",x,"started...",
# do something time-cosnuming
time.sleep(1)
x += 1
print " finished."
if interrupted:
break
signal.signal(signal.SIGINT, oldHandler)
def handle_interrupt(sig, frame):
global interrupted
interrupted = True
if __name__ == '__main__':
sys.exit(main())
12
你可以写一个信号处理函数
import signal,sys,time
terminate = False
def signal_handling(signum,frame):
global terminate
terminate = True
signal.signal(signal.SIGINT,signal_handling)
x=1
while True:
print "Processing file #",x,"started..."
time.sleep(1)
x+=1
if terminate:
print "I'll be back"
break
print "bye"
print x
按下 Ctrl+c 会发送一个 SIGINT 中断,这会输出:
Processing file # 1 started...
Processing file # 2 started...
^CI'll be back
bye
3
16
我觉得创建一个有状态的类来处理用户异常会更优雅,因为这样我就不需要去处理那些在不同模块之间不管用的全局变量了。
import signal
import time
class GracefulExiter():
def __init__(self):
self.state = False
signal.signal(signal.SIGINT, self.change_state)
def change_state(self, signum, frame):
print("exit flag set to True (repeat to exit now)")
signal.signal(signal.SIGINT, signal.SIG_DFL)
self.state = True
def exit(self):
return self.state
x = 1
flag = GracefulExiter()
while True:
print("Processing file #",x,"started...")
time.sleep(1)
x+=1
print(" finished.")
if flag.exit():
break
18
我会简单地使用一个异常处理器,这样可以捕捉到 KeyboardInterrupt
这个异常,并把它存储起来。然后,在每次循环结束的时候,如果有异常在等待处理,我就会跳出循环,并重新抛出这个异常(这样可以让正常的异常处理机制有机会运行)。
这个方法是有效的(在 Python 2.7 中测试过):
x = 1
print "Script started."
stored_exception=None
while True:
try:
print "Processing file #",x,"started...",
# do something time-cosnuming
time.sleep(1)
print " finished."
if stored_exception:
break
x += 1
except KeyboardInterrupt:
stored_exception=sys.exc_info()
print "Bye"
print "x=",x
if stored_exception:
raise stored_exception[0], stored_exception[1], stored_exception[2]
sys.exit()
编辑:正如评论中提到的,这个答案对提问者来说并不满意,这里有一个基于线程的解决方案:
import time
import sys
import threading
print "Script started."
class MyProcessingThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
print "Processing file #",x,"started...",
# do something time-cosnuming
time.sleep(1)
print " finished."
for x in range(1,4):
task = MyProcessingThread()
task.start()
try:
task.join()
except KeyboardInterrupt:
break
print "Bye"
print "x=",x
sys.exit()