在一定时间后中断该功能

2024-03-29 12:31:50 发布

您现在位置:Python中文网/ 问答频道 /正文

在Python中,以玩具为例:

for x in range(0, 3):
    # Call function A(x)

如果函数A需要5秒以上的时间,我想通过跳过它来继续for循环,这样我就不会被卡住或浪费时间

通过进行一些搜索,我意识到子进程或线程可能会有所帮助,但我不知道如何在这里实现它


Tags: 函数infor进程时间rangefunctioncall
3条回答

如果你能把工作分解,经常检查,那几乎总是最好的解决方案。但有时这是不可能的,例如,也许你正在从一个缓慢的文件共享中读取一个文件,而这个文件每隔一段时间就会挂起30秒。要在内部处理这个问题,您必须围绕异步I/O循环重新构造整个程序

如果你不需要跨平台,你可以在*nix(包括Mac和Linux)上使用信号,在Windows上使用APCs,等等。但是如果你需要跨平台,那是行不通的

所以,如果你真的需要同时做,你可以,有时你不得不。在这种情况下,您可能希望为此使用进程,而不是线程。你不能真正安全地杀死一个线程,但你可以杀死一个进程,它可以像你希望的那样安全。另外,如果线程由于CPU限制而占用了5秒以上的时间,那么您不希望为了GIL而与它发生冲突

这里有两个基本选项


首先,您可以将代码放入另一个脚本中,并使用subprocess运行它:

subprocess.check_call([sys.executable, 'other_script.py', arg, other_arg],
                      timeout=5)

因为这是通过正常的子进程通道进行的,所以您可以使用的唯一通信是一些argv字符串、成功/失败返回值(实际上是一个小整数,但这并不是更好),以及可选的一大块文本输入和一大块文本输出


或者,您可以使用multiprocessing生成类似线程的子进程:

p = multiprocessing.Process(func, args)
p.start()
p.join(5)
if p.is_alive():
    p.terminate()

正如您所见,这有点复杂,但在以下几个方面更好:

  • 您可以传递任意Python对象(至少是可以pickle的任何对象),而不仅仅是字符串
  • 不必将目标代码放在完全独立的脚本中,您可以将其作为函数留在同一脚本中
  • 它更灵活,例如,如果您以后需要(比如)传递进度更新,则很容易在任一方向或两个方向添加队列

任何一种并行性的最大问题都是共享可变数据——例如,让后台任务更新一个全局字典作为其工作的一部分(您的评论说您正在尝试这样做)。对于线程,您可以侥幸逃脱,但竞争条件可能会导致数据损坏,因此必须非常小心锁定。对于子进程,您根本无法逃脱。(是的,正如Sharing state between processes所解释的,您可以使用共享内存,但这仅限于简单的类型,如数字、固定数组和您知道如何定义为C结构的类型,这只会让您回到与线程相同的问题。)


理想情况下,您可以安排一些事情,以便在进程运行时不需要共享任何数据。您可以传入一个dict作为参数,然后返回一个dict。这通常是很容易安排,当你有一个以前的同步功能,你想把在后台

但如果,比如说,部分结果比没有结果好呢?在这种情况下,最简单的解决方案是通过队列传递结果。如Exchanging objects between processes中所述,您可以使用显式队列来实现这一点,但有一种更简单的方法

如果您可以将整个流程分解为单独的任务,每个任务对应于您希望保留在字典中的值(或值组),那么您可以将它们安排在Pool上,或者更好的是,安排在^{}上。(如果您使用的是Python2.x或3.1,请参阅PyPI上的backport^{}。)

假设您的慢速函数如下所示:

def spam():
    global d
    for meat in get_all_meats():
        count = get_meat_count(meat)
        d.setdefault(meat, 0) += count

相反,您应该这样做:

def spam_one(meat):
    count = get_meat_count(meat)
    return meat, count

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
    results = executor.map(spam_one, get_canned_meats(), timeout=5)
    for (meat, count) in results:
        d.setdefault(meat, 0) += count

在5秒内得到的结果会被添加到dict中;如果这还不是全部,那么剩下的部分将被放弃,并引发一个TimeoutError(您可以按照自己的意愿处理它,执行一些快速回退代码,不管怎样)

如果任务真的是独立的(就像我愚蠢的小例子中的那样,但当然它们可能不在你真正的代码中,至少在没有重大重新设计的情况下),你可以通过删除tmax_workers=1。然后,如果你在8核机器上运行它,它将启动8个工人,并给他们每八分之一的工作要做,事情会做得更快。(通常速度不是8倍,但通常是3-6倍,这仍然很好。)

根据声音防御的回答,也许有人觉得这个装饰师很有用:

import time
import signal

class TimeoutException(Exception):   # Custom exception class
    pass


def break_after(seconds=2):
    def timeout_handler(signum, frame):   # Custom signal handler
        raise TimeoutException
    def function(function):
        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(seconds)
            try:
                res = function(*args, **kwargs)
                signal.alarm(0)      # Clear alarm
                return res
            except TimeoutException:
                print u'Oops, timeout: %s sec reached.' % seconds, function.__name__, args, kwargs
            return
        return wrapper
    return function

测试:

@break_after(3)
def test(a, b, c):
    return time.sleep(10)

>>> test(1,2,3)
Oops, timeout: 3 sec reached. test (1, 2, 3) {}

我认为创建一个新的流程可能有些过分。如果您使用的是Mac或基于Unix的系统,则应该能够使用signal.SIGALRM强制超时耗时过长的函数。这将适用于因网络或其他问题而处于空闲状态的函数,这些问题您完全无法通过修改函数来处理。我有一个在这个答案中使用它的例子:

Option for SSH to timeout after a short time? ClientAlive & ConnectTimeout don't seem to do what I need them to do

在这里编辑我的答案,虽然我不确定我应该这样做:

import signal

class TimeoutException(Exception):   # Custom exception class
    pass

def timeout_handler(signum, frame):   # Custom signal handler
    raise TimeoutException

# Change the behavior of SIGALRM
signal.signal(signal.SIGALRM, timeout_handler)

for i in range(3):
    # Start the timer. Once 5 seconds are over, a SIGALRM signal is sent.
    signal.alarm(5)    
    # This try/except loop ensures that 
    #   you'll catch TimeoutException when it's sent.
    try:
        A(i) # Whatever your function that might hang
    except TimeoutException:
        continue # continue the for loop if function A takes more than 5 second
    else:
        # Reset the alarm
        signal.alarm(0)

这基本上会设置一个5秒的计时器,然后尝试执行代码。如果未能在时间结束前完成,将发送一个SIGALRM,我们将捕获它并将其转换为TimeoutException。这将迫使您转到except块,在该块中您的程序可以继续

相关问题 更多 >