<p>我认为您可能需要让您的<code>add_task</code>方法知道它是否从事件循环以外的线程调用。这样,如果它是从同一线程调用的,您可以直接调用<code>asyncio.async</code>,否则,它可以做一些额外的工作将任务从循环线程传递到调用线程。下面是一个例子:</p>
<pre><code>import time
import asyncio
import functools
from threading import Thread, current_thread, Event
from concurrent.futures import Future
class B(Thread):
def __init__(self, start_event):
Thread.__init__(self)
self.loop = None
self.tid = None
self.event = start_event
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.tid = current_thread()
self.loop.call_soon(self.event.set)
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def add_task(self, coro):
"""this method should return a task object, that I
can cancel, not a handle"""
def _async_add(func, fut):
try:
ret = func()
fut.set_result(ret)
except Exception as e:
fut.set_exception(e)
f = functools.partial(asyncio.async, coro, loop=self.loop)
if current_thread() == self.tid:
return f() # We can call directly if we're not going between threads.
else:
# We're in a non-event loop thread so we use a Future
# to get the task from the event loop thread once
# it's ready.
fut = Future()
self.loop.call_soon_threadsafe(_async_add, f, fut)
return fut.result()
def cancel_task(self, task):
self.loop.call_soon_threadsafe(task.cancel)
@asyncio.coroutine
def test():
while True:
print("running")
yield from asyncio.sleep(1)
event = Event()
b = B(event)
b.start()
event.wait() # Let the loop's thread signal us, rather than sleeping
t = b.add_task(test()) # This is a real task
time.sleep(10)
b.stop()
</code></pre>
<p>首先,我们将事件循环的线程id保存在<code>run</code>方法中,这样我们就可以确定稍后对<code>add_task</code>的调用是否来自其他线程。如果<code>add_task</code>是从非事件循环线程调用的,我们使用<code>call_soon_threadsafe</code>调用一个同时调度协程的函数,然后使用<code>concurrent.futures.Future</code>将任务传递回调用线程,该线程等待<code>Future</code>的结果。</p>
<p>关于取消任务的说明:当您在<code>Task</code>上调用<code>cancel</code>时,下次运行事件循环时,将在协程中引发<code>CancelledError</code>。这意味着,任务正在包装的协程将在下次到达屈服点时由于异常而中止,除非协程捕获<code>CancelledError</code>,并阻止其自身中止。还要注意,这只在被包装的函数实际上是一个可中断的协程的情况下才有效;例如,由<code>BaseEventLoop.run_in_executor</code>返回的<code>asyncio.Future</code>实际上不能被取消,因为它实际上被包装在<code>concurrent.futures.Future</code>上,并且一旦它们的底层函数真正开始执行,这些函数就不能被取消。在这些情况下,<code>asyncio.Future</code>会说它被取消了,但是实际在执行器中运行的函数将继续运行。</p>
<p><strong>编辑:</strong>根据Andrew Svetlov的建议,更新了第一个示例以使用<code>concurrent.futures.Future</code>,而不是<code>queue.Queue</code>。</p>
<p>注意:由于版本3.4.4使用<a href="https://docs.python.org/3/library/asyncio-task.html#asyncio.ensure_future" rel="noreferrer">^{<cd20>}</a>,因此不推荐使用<a href="https://docs.python.org/3/library/asyncio-task.html#asyncio.async" rel="noreferrer">^{<cd2>}</a>。</p>