如何在 concurrent.futures 中使用超时?

36 投票
3 回答
42150 浏览
提问于 2025-04-16 20:27

我正在尝试在python3.2中使用concurrent.futures模块来实现超时功能。但是,当超时发生时,任务并没有真正停止执行。我尝试了线程和进程池两种方式,但无论哪种方式都无法停止任务,只有等任务完成后才会抛出超时错误。那么,有人知道怎么才能让这个功能正常工作吗?

import concurrent.futures
import time
import datetime

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]

def run_loop(max_number):
    print("Started:", datetime.datetime.now(), max_number)
    last_number = 0;
    for i in range(1, max_number + 1):
        last_number = i * i
    return last_number

def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
        try:
            for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
                print(future.result(timeout=1))
        except concurrent.futures._base.TimeoutError:
            print("This took to long...")

if __name__ == '__main__':
    main()

3 个回答

0

我的建议是使用线程池(ThreadPool),而不是 concurrent.futures。正如文档所说:

所有被放入线程池的线程在解释器退出之前都会被合并(joined)。请注意,执行这个合并的处理程序是在使用 atexit 添加的任何退出处理程序之前执行的。这意味着主线程中的异常必须被捕获并处理,以便能够通知其他线程正常退出。

在更复杂的情况下,整个程序可能会卡住。下面的代码片段虽然有点偏离问题,但足以说明我的意思:

import concurrent.futures, time, datetime
from multiprocessing.pool import ThreadPool

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]

def run_loop(max_number):
    print("Started:", datetime.datetime.now(), max_number)
    last_number = 0
    i = 0
    while True:
        last_number = i * i
        i += 1
    return last_number

def origin():
    try:
        with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
            try:
                for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
                    print(future.result(timeout=1))
            except concurrent.futures._base.TimeoutError:
                print("This took to long...") # It suspends infinitely.
    except:
        print('Ending from origin.')

def update():
    try:
        with ThreadPool(len(max_numbers)) as pool:
            result = pool.map_async(run_loop, max_numbers)
            for num in result.get(2):
                print(num)
    except:
        print('Ending from update.')

if __name__ == '__main__':
    origin()
    # update()
14

最近我也遇到了这个问题,最后我想出了一个解决方案,使用了 ProcessPoolExecutor


def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
        try:
            for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
                print(future.result(timeout=1))
        except concurrent.futures._base.TimeoutError:
            print("This took to long...")
            stop_process_pool(executor)

def stop_process_pool(executor):
    for pid, process in executor._processes.items():
        process.terminate()
    executor.shutdown()
35

根据我的理解,TimeoutError这个错误是在你预期的时候被触发的,而不是在任务完成后才出现。

不过,你的程序会继续运行,直到所有正在进行的任务都完成。这是因为当前正在执行的任务(在你的情况下,可能是你提交的所有任务,因为你的线程池大小和任务数量是一样的)并不会被“杀掉”。

TimeoutError被触发,是为了让你可以选择不等到任务完成(而去做其他事情),但任务会一直运行直到完成。而且只要你的执行器中还有未完成的任务,Python就不会退出。

据我所知,不能简单地“停止”当前正在执行的任务,你只能“取消”那些还没有开始的任务。在你的例子中,可能没有这样的任务,但想象一下你有5个线程/进程的池子,而你想处理100个项目。在某个时刻,可能会有20个任务完成,5个任务正在运行,还有75个任务被安排。在这种情况下,你可以取消那76个安排好的任务,但那4个正在运行的任务会继续执行,直到完成,无论你是否等待结果。

虽然不能这样做,但我想应该有其他方法可以达到你想要的结果。也许这个版本可以帮助你(不确定它是否完全符合你的需求,但可能会有一些用处):

import concurrent.futures
import time
import datetime

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]

class Task:
    def __init__(self, max_number):
        self.max_number = max_number
        self.interrupt_requested = False

    def __call__(self):
        print("Started:", datetime.datetime.now(), self.max_number)
        last_number = 0;
        for i in xrange(1, self.max_number + 1):
            if self.interrupt_requested:
                print("Interrupted at", i)
                break
            last_number = i * i
        print("Reached the end")
        return last_number

    def interrupt(self):
        self.interrupt_requested = True

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor:
        tasks = [Task(num) for num in max_numbers]
        for task, future in [(i, executor.submit(i)) for i in tasks]:
            try:
                print(future.result(timeout=1))
            except concurrent.futures.TimeoutError:
                print("this took too long...")
                task.interrupt()


if __name__ == '__main__':
    main()

通过为每个“任务”创建一个可调用的对象,并将这些对象传递给执行器,而不是直接传递一个普通的函数,你可以提供一种“中断”任务的方式。
小提示:去掉task.interrupt()这一行,看看会发生什么,这可能会让你更容易理解我上面长长的解释;-)

撰写回答