<p>您的错误是在调用<code>app.run()</code>之后尝试运行asyncio事件循环。后者不返回,而是运行Flask开发服务器</p>
<p>事实上,这就是大多数WSGI设置的工作方式;主线程将忙于调度请求,或者Flask服务器作为WSGI服务器中的模块导入,并且您也不能在这里启动事件循环</p>
<p>相反,您必须在单独的线程中运行asyncio事件循环<strong>,然后通过<a href="https://docs.python.org/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe" rel="nofollow noreferrer">^{<cd2>}</a>在该单独的线程中运行协程。请参阅文档中的<a href="https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading" rel="nofollow noreferrer"><em>Coroutines and Multithreading</em> section</a>了解这需要什么</p>
<p>下面是一个模块的实现,该模块将运行这样一个事件循环线程,并提供实用程序来计划在该循环中运行的协程:</p>
<pre><code>import asyncio
import itertools
import threading
__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]
class EventLoopThread(threading.Thread):
loop = None
_count = itertools.count(0)
def __init__(self):
self.started = threading.Event()
name = f"{type(self).__name__}-{next(self._count)}"
super().__init__(name=name, daemon=True)
def __repr__(self):
loop, r, c, d = self.loop, False, True, False
if loop is not None:
r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
return (
f"<{type(self).__name__} {self.name} id={self.ident} "
f"running={r} closed={c} debug={d}>"
)
def run(self):
self.loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.call_later(0, self.started.set)
try:
loop.run_forever()
finally:
try:
shutdown_asyncgens = loop.shutdown_asyncgens()
except AttributeError:
pass
else:
loop.run_until_complete(shutdown_asyncgens)
try:
shutdown_executor = loop.shutdown_default_executor()
except AttributeError:
pass
else:
loop.run_until_complete(shutdown_executor)
asyncio.set_event_loop(None)
loop.close()
def stop(self):
loop, self.loop = self.loop, None
if loop is None:
return
loop.call_soon_threadsafe(loop.stop)
self.join()
_lock = threading.Lock()
_loop_thread = None
def get_event_loop():
global _loop_thread
if _loop_thread is None:
with _lock:
if _loop_thread is None:
_loop_thread = EventLoopThread()
_loop_thread.start()
# give the thread up to a second to produce a loop
_loop_thread.started.wait(1)
return _loop_thread.loop
def stop_event_loop():
global _loop_thread
with _lock:
if _loop_thread is not None:
_loop_thread.stop()
_loop_thread = None
def run_coroutine(coro):
"""Run the coroutine in the event loop running in a separate thread
Returns a Future, call Future.result() to get the output
"""
return asyncio.run_coroutine_threadsafe(coro, get_event_loop())
</code></pre>
<p>您可以使用此处定义的<code>run_coroutine()</code>函数来调度异步IO例程。使用返回的<a href="https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future" rel="nofollow noreferrer">^{<cd4>} instance</a>控制协同程序:</p>
<ul>
<li>使用<code>Future.result()</code>获取结果。你可以给它一个超时;如果在超时时间内未产生任何结果,则协同程序将自动取消</李>
<li>您可以使用<code>.cancelled()</code>、<code>.running()</code>和<code>.done()</code>方法查询协同程序的状态</李>
<li>您可以向将来添加回调,当协同程序完成、取消或引发异常时将调用回调(请考虑这可能是从事件循环线程调用的,而不是从中调用<code>run_coroutine()</code>的线程)</李>
</ul>
<p>对于您的特定示例,<code>abar()</code>不返回任何结果,您可以忽略返回的未来,如下所示:</p>
<pre><code>@app.route("/")
def notify():
run_coroutine(abar("abar"))
return "OK"
</code></pre>
<p>请注意,在Python3.8之前的<em>中,您不能使用在单独线程上运行的事件循环来创建子流程!请参阅我对<a href="https://stackoverflow.com/questions/58547753/python3-flask-asyncio-subprocess-in-route-hangs">Python3 Flask asyncio subprocess in route hangs</a>的回答,了解Python 3.8 <code>ThreadedChildWatcher</code>类的后端口</p>