在一定时间后中断函数执行

31 投票
5 回答
45861 浏览
提问于 2025-04-18 15:16

在Python中,举个简单的例子:

for x in range(0, 3):
    # Call function A(x)

我想要在一个for循环中,如果函数A花费超过五秒钟,就跳过这个函数,这样我就不会卡住或者浪费时间。

通过一些搜索,我发现使用子进程或者线程可能会有帮助,但我不知道该怎么在这里实现。

5 个回答

3

评论说得对,你应该检查里面的内容。这里有一个可能的解决方案。需要注意的是,异步函数(比如使用线程)和这个解决方案是不同的。这个解决方案是同步的,也就是说它会一个接一个地运行。

import time

for x in range(0,3):
    someFunction()

def someFunction():
    start = time.time()
    while (time.time() - start < 5):
        # do your normal function

    return;
4

这听起来像是个更好的主意(抱歉,我还不太确定Python里这些东西的名称):

import signal

def signal_handler(signum, frame):
    raise Exception("Timeout!")

signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(3) # Three seconds
try:
    for x in range(0, 3):
        # Call function A(x)
except Exception, msg:
    print "Timeout!"
signal.alarm(0) # Reset
12

如果你能把工作分成小块,并且定期检查一下,那通常是最好的解决办法。但有时候这不太可能,比如你可能在从一个慢速的文件共享读取文件,这个过程偶尔会卡住30秒。为了处理这种情况,你需要重新设计整个程序,围绕一个异步输入输出循环来进行。

如果你不需要跨平台支持,可以在*类Unix系统(包括Mac和Linux)上使用信号,在Windows上使用APC等。但如果需要跨平台,这些方法就不适用了。

所以,如果你真的需要并发处理,你可以这样做,有时候你也必须这样做。在这种情况下,你可能想用进程,而不是线程。因为你不能安全地终止一个线程,但你可以终止一个进程,而且可以做到尽可能安全。此外,如果线程因为CPU占用过高而需要5秒以上的时间,你也不想和它争夺全局解释器锁(GIL)。

这里有两个基本的选择。


首先,你可以把代码放在另一个脚本里,然后用subprocess来运行它:

subprocess.check_call([sys.executable, 'other_script.py', arg, other_arg],
                      timeout=5)

由于这是通过正常的子进程通道进行的,所以你能用的通信方式只有一些argv字符串,一个成功/失败的返回值(实际上是一个小整数,但这也没好到哪里去),以及可选的输入和输出文本。


另外,你可以使用multiprocessing来生成一个类似线程的子进程:

p = multiprocessing.Process(func, args)
p.start()
p.join(5)
if p.is_alive():
    p.terminate()

如你所见,这种方法稍微复杂一些,但在几个方面更好:

  • 你可以传递任意的Python对象(至少是可以被序列化的对象),而不仅仅是字符串。
  • 你不需要把目标代码放在一个完全独立的脚本中,可以把它保留在同一个脚本中的一个函数里。
  • 它更灵活,比如如果你以后需要传递进度更新,添加一个队列在任一方向上都很简单。

任何形式的并行处理都面临一个大问题,就是共享可变数据,比如让一个后台任务更新全局字典(你的评论中提到你想这样做)。使用线程时,你可以勉强做到,但竞争条件可能导致数据损坏,所以你必须非常小心地使用锁。而使用子进程时,你根本无法做到这一点。(是的,你可以使用共享内存,正如在进程间共享状态中所解释的,但这仅限于简单类型,比如数字、固定数组和你知道如何定义为C结构的类型,这样又回到了线程面临的同样问题。)


理想情况下,你应该安排好事情,这样在进程运行时就不需要共享任何数据——你传入一个dict作为参数,得到一个dict作为结果。通常,当你有一个之前是同步的函数想要放到后台时,这样安排是相对简单的。

但如果说,部分结果比没有结果要好呢?在这种情况下,最简单的解决办法是通过队列传递结果。你可以使用显式队列,正如在进程间交换对象中所解释的,但还有一种更简单的方法。

如果你能把这个庞大的进程拆分成独立的任务,每个任务处理你想放入字典的一个值(或一组值),你可以在一个Pool上调度它们——甚至更好的是,使用concurrent.futures.Executor。(如果你使用的是Python 2.x或3.1,可以查看PyPI上的后移植版futures。)

假设你的慢函数是这样的:

def spam():
    global d
    for meat in get_all_meats():
        count = get_meat_count(meat)
        d.setdefault(meat, 0) += count

你可以这样做:

def spam_one(meat):
    count = get_meat_count(meat)
    return meat, count

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
    results = executor.map(spam_one, get_canned_meats(), timeout=5)
    for (meat, count) in results:
        d.setdefault(meat, 0) += count

在5秒内得到的所有结果都会添加到字典中;如果没有得到所有结果,其余的就会被放弃,并且会抛出一个TimeoutError(你可以根据自己的需要处理,比如记录日志、执行一些快速的回退代码等等)。

如果这些任务确实是独立的(就像我这个简单的例子一样,但在你的实际代码中可能并非如此,至少不需要进行重大重构),你可以通过去掉max_workers=1来免费实现并行处理。这样,如果你在一台8核机器上运行,它会启动8个工作进程,每个进程处理1/8的工作,事情就会更快完成。(通常不会快8倍,但通常会快3-6倍,这仍然相当不错。)

16

也许有人会觉得这个装饰器有用,这是基于TheSoundDefense的回答:

import time
import signal

class TimeoutException(Exception):   # Custom exception class
    pass


def break_after(seconds=2):
    def timeout_handler(signum, frame):   # Custom signal handler
        raise TimeoutException
    def function(function):
        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(seconds)
            try:
                res = function(*args, **kwargs)
                signal.alarm(0)      # Clear alarm
                return res
            except TimeoutException:
                print u'Oops, timeout: %s sec reached.' % seconds, function.__name__, args, kwargs
            return
        return wrapper
    return function

测试:

@break_after(3)
def test(a, b, c):
    return time.sleep(10)

>>> test(1,2,3)
Oops, timeout: 3 sec reached. test (1, 2, 3) {}
50

我觉得创建一个新进程可能有点过于复杂。如果你使用的是Mac或者基于Unix的系统,你可以使用signal.SIGALRM来强制让一些运行太久的函数超时。这种方法适用于那些因为网络问题或者其他你无法通过修改函数来解决的问题而闲置的函数。我在这个回答中有个例子:

SSH有办法在短时间后超时吗?ClientAlive和ConnectTimeout似乎无法满足我的需求

我在这里编辑我的回答,虽然我不太确定这样做是否合适:

import signal

class TimeoutException(Exception):   # Custom exception class
    pass

def timeout_handler(signum, frame):   # Custom signal handler
    raise TimeoutException

# Change the behavior of SIGALRM
signal.signal(signal.SIGALRM, timeout_handler)

for i in range(3):
    # Start the timer. Once 5 seconds are over, a SIGALRM signal is sent.
    signal.alarm(5)    
    # This try/except loop ensures that 
    #   you'll catch TimeoutException when it's sent.
    try:
        A(i) # Whatever your function that might hang
    except TimeoutException:
        continue # continue the for loop if function A takes more than 5 second
    else:
        # Reset the alarm
        signal.alarm(0)

这个代码基本上是设置一个5秒的计时器,然后尝试执行你的代码。如果在时间到之前没有完成,就会发送一个SIGALRM信号,我们会捕获这个信号并把它转化为一个TimeoutException。这样就会强制程序进入异常处理的部分,继续执行后面的代码。

撰写回答