在超时内执行完成则从功能返回,否则回调

2024-06-12 02:50:32 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个Python3.5版本的项目,没有使用任何异步特性。我必须实现以下逻辑:

def should_return_in_3_sec(some_serious_job, arguments, finished_callback):
    # Start some_serious_job(*arguments) in a task
    # if it finishes within 3 sec:
    #    return result immediately
    # otherwise return None, but do not terminate task.
    # If the task finishes in 1 minute:
    #    call finished_callback(result)
    # else:
    #    call finished_callback(None)
    pass

函数should_return_in_3_sec()应该保持同步,但是由我来编写任何新的异步代码(包括some_serious_job())。在

什么是最优雅和Python式的方法?在


Tags: innonetaskreturncallbackjobsomesec
2条回答

threading模块有一些简单的超时选项,例如参见Thread.join(timeout)。在

如果您选择使用asyncio,下面是一个部分解决方案,可以满足您的一些需求:

import asyncio

import time


async def late_response(task, flag, timeout, callback):
    done, pending = await asyncio.wait([task], timeout=timeout)
    callback(done.pop().result() if done else None)  # will raise an exception if some_serious_job failed
    flag[0] = True  # signal some_serious_job to stop
    return await task


async def launch_job(loop, some_serious_job, arguments, finished_callback,
                     timeout_1=3, timeout_2=5):
    flag = [False]
    task = loop.run_in_executor(None, some_serious_job, flag, *arguments)
    done, pending = await asyncio.wait([task], timeout=timeout_1)
    if done:
        return done.pop().result()  # will raise an exception if some_serious_job failed
    asyncio.ensure_future(
        late_response(task, flag, timeout_2, finished_callback))
    return None


def f(flag, n):
    for i in range(n):
        print("serious", i, flag)
        if flag[0]:
            return "CANCELLED"
        time.sleep(1)
    return "OK"


def finished(result):
    print("FINISHED", result)


loop = asyncio.get_event_loop()
result = loop.run_until_complete(launch_job(loop, f, [1], finished))
print("result:", result)
loop.run_forever()

这将在一个单独的线程中运行作业(改为使用loop.set_executor(ProcessPoolExecutor())在进程中运行CPU密集型任务)。请记住,终止进程/线程是一种不好的做法-上面的代码使用一个非常简单的列表来通知线程停止(另请参见threading.Event/multiprocessing.Event)。在

在实现解决方案时,您可能会发现需要修改现有代码以使用coutroutines而不是使用线程。在

把一个执行严肃任务的线程分开,让它将结果写入队列,然后终止。从该队列读入主线程,超时3秒。如果超时,启动另一个线程并返回None。让第二个线程以一分钟的超时从队列中读取数据;如果超时,则调用finished_callback(None);否则调用finished_callback(result)。在

我是这样画的:

import threading, queue

def should_return_in_3_sec(some_serious_job, arguments, finished_callback):
  result_queue = queue.Queue(1)

  def do_serious_job_and_deliver_result():
    result = some_serious_job(arguments)
    result_queue.put(result)

  threading.Thread(target=do_serious_job_and_deliver_result).start()

  try:
    result = result_queue.get(timeout=3)
  except queue.Empty:  # timeout?

    def expect_and_handle_late_result():
      try:
        result = result_queue.get(timeout=60)
      except queue.Empty:
        finished_callback(None)
      else:
        finished_callback(result)

    threading.Thread(target=expect_and_handle_late_result).start()
    return None
  else:
    return result

相关问题 更多 >