asyncio:可以取消由Executor运行的Future吗?
我想在一个执行器中启动一个阻塞函数,使用 asyncio 的 call loop.run_in_executor,然后再取消它,但这似乎对我不起作用。
这是代码:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_func(seconds_to_block):
for i in range(seconds_to_block):
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)
print('done blocking {}'.format(seconds_to_block))
@asyncio.coroutine
def non_blocking_func(seconds):
for i in range(seconds):
print('yielding {}/{}'.format(i, seconds))
yield from asyncio.sleep(1)
print('done non blocking {}'.format(seconds))
@asyncio.coroutine
def main():
non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
blocking_future = loop.run_in_executor(None, blocking_func, 5)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)
blocking_future.cancel()
yield from asyncio.wait(non_blocking_futures)
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()
我本以为上面的代码只会让阻塞函数输出:
blocking 0/5
blocking 1/5
然后看到非阻塞函数的输出。但是实际上,即使我已经取消了,阻塞的未来依然在继续。
这可能吗?有没有其他方法可以做到这一点?
谢谢
编辑:关于使用 asyncio 运行阻塞和非阻塞代码的更多讨论: 如何将阻塞和非阻塞代码与 asyncio 结合使用
2 个回答
因为线程共享同一个进程的内存地址空间,所以没有安全的方法来终止一个正在运行的线程。这就是为什么大多数编程语言不允许直接杀死正在运行的线程(虽然有很多不太优雅的解决方法来绕过这个限制)。
Java在这方面吃了很多苦头。
一个解决办法是把你的函数放在一个单独的进程中运行,而不是线程,这样可以优雅地结束它。
Pebble库提供了一个类似于concurrent.futures
的接口,支持可以被取消的Futures
。
from pebble import ProcessPool
def function(foo, bar=0):
return foo + bar
with ProcessPool() as pool:
future = pool.schedule(function, args=[1])
# if running, the container process will be terminated
# a new process will be started consuming the next task
future.cancel()
在这种情况下,一旦你开始运行一个 Future
,就没有办法取消它,因为你依赖的是 concurrent.futures.Future
的行为,而它的文档中说明了以下内容:
cancel()
尝试取消这个调用。如果这个调用正在执行中并且无法取消,那么这个方法会返回
False
,否则这个调用会被取消,方法会返回True
。
所以,只有在任务还在 Executor
中待处理时,取消才会成功。现在,你实际上是使用一个 asyncio.Future
,它包裹了一个 concurrent.futures.Future
,在实际操作中,如果你在调用 cancel()
后尝试 yield from
这个 asyncio.Future
,它会抛出一个 CancellationError
,即使底层的任务实际上已经在运行了。但它并不会真正取消 Executor
中任务的执行。
如果你需要真正取消这个任务,你需要使用一种更传统的方法来中断在线程中运行的任务。具体怎么做取决于你的使用场景。对于你在例子中展示的使用场景,你可以使用 threading.Event
:
def blocking_func(seconds_to_block, event):
for i in range(seconds_to_block):
if event.is_set():
return
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)
print('done blocking {}'.format(seconds_to_block))
...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)
blocking_future.cancel() # Mark Future as cancelled
event.set() # Actually interrupt blocking_func