asyncio:可以取消由Executor运行的Future吗?

33 投票
2 回答
20138 浏览
提问于 2025-04-28 00:39

我想在一个执行器中启动一个阻塞函数,使用 asyncio 的 call loop.run_in_executor,然后再取消它,但这似乎对我不起作用。

这是代码:

import asyncio
import time

from concurrent.futures import ThreadPoolExecutor


def blocking_func(seconds_to_block):
    for i in range(seconds_to_block):
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


@asyncio.coroutine
def non_blocking_func(seconds):
    for i in range(seconds):
        print('yielding {}/{}'.format(i, seconds))
        yield from asyncio.sleep(1)

    print('done non blocking {}'.format(seconds))


@asyncio.coroutine
def main():
    non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
    blocking_future = loop.run_in_executor(None, blocking_func, 5)
    print('wait a few seconds!')
    yield from asyncio.sleep(1.5)

    blocking_future.cancel()
    yield from asyncio.wait(non_blocking_futures)



loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()

我本以为上面的代码只会让阻塞函数输出:

blocking 0/5
blocking 1/5

然后看到非阻塞函数的输出。但是实际上,即使我已经取消了,阻塞的未来依然在继续。

这可能吗?有没有其他方法可以做到这一点?

谢谢

编辑:关于使用 asyncio 运行阻塞和非阻塞代码的更多讨论: 如何将阻塞和非阻塞代码与 asyncio 结合使用

暂无标签

2 个回答

2

因为线程共享同一个进程的内存地址空间,所以没有安全的方法来终止一个正在运行的线程。这就是为什么大多数编程语言不允许直接杀死正在运行的线程(虽然有很多不太优雅的解决方法来绕过这个限制)。

Java在这方面吃了很多苦头。

一个解决办法是把你的函数放在一个单独的进程中运行,而不是线程,这样可以优雅地结束它。

Pebble库提供了一个类似于concurrent.futures的接口,支持可以被取消的Futures

from pebble import ProcessPool

def function(foo, bar=0):
    return foo + bar

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1])

    # if running, the container process will be terminated 
    # a new process will be started consuming the next task
    future.cancel()  
32

在这种情况下,一旦你开始运行一个 Future,就没有办法取消它,因为你依赖的是 concurrent.futures.Future 的行为,而它的文档中说明了以下内容

cancel()

尝试取消这个调用。如果这个调用正在执行中并且无法取消,那么这个方法会返回 False,否则这个调用会被取消,方法会返回 True

所以,只有在任务还在 Executor 中待处理时,取消才会成功。现在,你实际上是使用一个 asyncio.Future,它包裹了一个 concurrent.futures.Future,在实际操作中,如果你在调用 cancel() 后尝试 yield from 这个 asyncio.Future,它会抛出一个 CancellationError,即使底层的任务实际上已经在运行了。但它并不会真正取消 Executor 中任务的执行。

如果你需要真正取消这个任务,你需要使用一种更传统的方法来中断在线程中运行的任务。具体怎么做取决于你的使用场景。对于你在例子中展示的使用场景,你可以使用 threading.Event

def blocking_func(seconds_to_block, event):
    for i in range(seconds_to_block):
        if event.is_set():
            return
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)

blocking_future.cancel()  # Mark Future as cancelled
event.set() # Actually interrupt blocking_func

撰写回答