如何在 concurrent.futures 中使用超时?
我正在尝试在python3.2中使用concurrent.futures模块来实现超时功能。但是,当超时发生时,任务并没有真正停止执行。我尝试了线程和进程池两种方式,但无论哪种方式都无法停止任务,只有等任务完成后才会抛出超时错误。那么,有人知道怎么才能让这个功能正常工作吗?
import concurrent.futures
import time
import datetime
max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]
def run_loop(max_number):
print("Started:", datetime.datetime.now(), max_number)
last_number = 0;
for i in range(1, max_number + 1):
last_number = i * i
return last_number
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
try:
for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
print(future.result(timeout=1))
except concurrent.futures._base.TimeoutError:
print("This took to long...")
if __name__ == '__main__':
main()
3 个回答
我的建议是使用线程池(ThreadPool),而不是 concurrent.futures
。正如文档所说:
所有被放入线程池的线程在解释器退出之前都会被合并(joined)。请注意,执行这个合并的处理程序是在使用 atexit 添加的任何退出处理程序之前执行的。这意味着主线程中的异常必须被捕获并处理,以便能够通知其他线程正常退出。
在更复杂的情况下,整个程序可能会卡住。下面的代码片段虽然有点偏离问题,但足以说明我的意思:
import concurrent.futures, time, datetime
from multiprocessing.pool import ThreadPool
max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]
def run_loop(max_number):
print("Started:", datetime.datetime.now(), max_number)
last_number = 0
i = 0
while True:
last_number = i * i
i += 1
return last_number
def origin():
try:
with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
try:
for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
print(future.result(timeout=1))
except concurrent.futures._base.TimeoutError:
print("This took to long...") # It suspends infinitely.
except:
print('Ending from origin.')
def update():
try:
with ThreadPool(len(max_numbers)) as pool:
result = pool.map_async(run_loop, max_numbers)
for num in result.get(2):
print(num)
except:
print('Ending from update.')
if __name__ == '__main__':
origin()
# update()
最近我也遇到了这个问题,最后我想出了一个解决方案,使用了 ProcessPoolExecutor
:
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
try:
for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
print(future.result(timeout=1))
except concurrent.futures._base.TimeoutError:
print("This took to long...")
stop_process_pool(executor)
def stop_process_pool(executor):
for pid, process in executor._processes.items():
process.terminate()
executor.shutdown()
根据我的理解,TimeoutError这个错误是在你预期的时候被触发的,而不是在任务完成后才出现。
不过,你的程序会继续运行,直到所有正在进行的任务都完成。这是因为当前正在执行的任务(在你的情况下,可能是你提交的所有任务,因为你的线程池大小和任务数量是一样的)并不会被“杀掉”。
TimeoutError被触发,是为了让你可以选择不等到任务完成(而去做其他事情),但任务会一直运行直到完成。而且只要你的执行器中还有未完成的任务,Python就不会退出。
据我所知,不能简单地“停止”当前正在执行的任务,你只能“取消”那些还没有开始的任务。在你的例子中,可能没有这样的任务,但想象一下你有5个线程/进程的池子,而你想处理100个项目。在某个时刻,可能会有20个任务完成,5个任务正在运行,还有75个任务被安排。在这种情况下,你可以取消那76个安排好的任务,但那4个正在运行的任务会继续执行,直到完成,无论你是否等待结果。
虽然不能这样做,但我想应该有其他方法可以达到你想要的结果。也许这个版本可以帮助你(不确定它是否完全符合你的需求,但可能会有一些用处):
import concurrent.futures
import time
import datetime
max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]
class Task:
def __init__(self, max_number):
self.max_number = max_number
self.interrupt_requested = False
def __call__(self):
print("Started:", datetime.datetime.now(), self.max_number)
last_number = 0;
for i in xrange(1, self.max_number + 1):
if self.interrupt_requested:
print("Interrupted at", i)
break
last_number = i * i
print("Reached the end")
return last_number
def interrupt(self):
self.interrupt_requested = True
def main():
with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor:
tasks = [Task(num) for num in max_numbers]
for task, future in [(i, executor.submit(i)) for i in tasks]:
try:
print(future.result(timeout=1))
except concurrent.futures.TimeoutError:
print("this took too long...")
task.interrupt()
if __name__ == '__main__':
main()
通过为每个“任务”创建一个可调用的对象,并将这些对象传递给执行器,而不是直接传递一个普通的函数,你可以提供一种“中断”任务的方式。
小提示:去掉task.interrupt()
这一行,看看会发生什么,这可能会让你更容易理解我上面长长的解释;-)