使用异常取消长函数

2024-05-23 21:15:35 发布

您现在位置:Python中文网/ 问答频道 /正文

我有execute函数,它调用一些非常长的任务。我想有可能取消这些任务。我想抛出一个异常并在外面捕捉它。问题是,我的异常在future之外不可见。如何修复此代码以正确取消

def cancel_task(self):
   while True:
      process = get_process()
      if process.status == "CANCELLED":
         raise OperationCancelledException()
      time.sleep(1)

def execute(self):
   with ThreadPoolExecutor() as executor:
      future = executor.submit(self.cancel_task)
      result = some_very_long_task()
      future.cancel()

   return result
# somewhere in the code where execute() is called
try:
   execute()
except OperationCancelledException:
   pass
except Exception:
   raise

Tags: 函数代码selftaskexecutedeffutureresult
1条回答
网友
1楼 · 发布于 2024-05-23 21:15:35

已在此处初始化future对象:

future = executor.submit(self.cancel_task)

然后,我们运行长任务:

result = some_very_long_task()

some_very_long_task()的调用将首先运行到完成,然后再转到下一行。无论future对象此时是否引发异常,它都不会影响当前执行。实际上,如文档所述,您必须显式调用Future.result(),以重新引发提交过程中发生的任何异常(这里是^{

class concurrent.futures.Future

result(timeout=None)

If the call raised an exception, this method will raise the same exception.

因此,即使你称之为:

future = executor.submit(self.cancel_task)
result = some_very_long_task()
future.result()

它只会在some_very_long_task()运行完成后运行并重新引发任何异常,因此毫无意义,因为它实际上没有取消/停止长任务的执行

还有一个补充说明,将来的对象一旦启动就不能取消,因为documented

cancel()

Attempt to cancel the call. If the call is currently being executed or finished running and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

即使将some_very_long_task提交给执行器并将timeout参数设置为result()也不会有帮助,因为它仍然会等待任务完成,只会在任务完成后超过超时时引发TimeoutError

替代解决方案

也许你会找到一种方法,但似乎concurrent.futures不是这个工作的工具。您可以考虑使用{a3}代替。

  1. some_very_long_task生成一个新进程
  2. 在后台运行该进程
  3. 当进程在后台运行时,请检查是否必须已取消该进程
  4. 如果过程已完成,则按常规进行
  5. 但是,如果进程尚未完成,并且我们已经收到取消它的信号,请终止进程
from datetime import datetime, timezone
import multiprocessing
import time

class OperationCancelledException(Exception):
    pass

def some_very_long_task(response):
    print("Start long task")
    time.sleep(10)  # Simulate the long task with a sleep
    print("Finished long task")
    response['response'] = "the response!"

def get_process_status():
    # Let's say the signal to cancel an ongoing process comes from a file.
    with open('status.txt') as status_file:
        return status_file.read()

def execute():
    response = multiprocessing.Manager().dict()

    proc = multiprocessing.Process(target=some_very_long_task, args=(response,), kwargs={})
    proc.start()

    while proc.is_alive():
        status = get_process_status()
        if status == "CANCELLED":
            proc.terminate()
            raise OperationCancelledException()
        time.sleep(1)

    proc.join()
    return response.get('response')

try:
    print(datetime.now(timezone.utc), "Script started")
    result = execute()
except OperationCancelledException:
   print("Operation cancelled")
else:
    print(result)
finally:
    print(datetime.now(timezone.utc), "Script ended")

如果status.txt包含"PENDING",则输出:

$ python3 script.py 
2021-09-08 13:17:32.234437+00:00 Script started
Start long task
Finished long task
the response!
2021-09-08 13:17:42.293814+00:00 Script ended
  • 如果没有取消的信号,任务将运行到完成(睡眠10秒)

如果status.txt包含"PENDING",然后在脚本运行时更改为"CANCELLED",则输出:

$ python3 script.py 
2021-09-08 13:19:13.370828+00:00 Script started
Start long task
Operation cancelled
2021-09-08 13:19:16.403367+00:00 Script ended
  • 如果有取消该任务的信号,则该任务在3秒后(文件更新时)立即停止

相关参考资料:

相关问题 更多 >