从Flask路由进行Python异步IO调用

2024-04-16 17:07:25 发布

您现在位置:Python中文网/ 问答频道 /正文

我想在每次执行Flask路由时执行一个异步函数。为什么abar函数从未执行过

import asyncio
from flask import Flask

async def abar(a):
    print(a)

loop = asyncio.get_event_loop()
app = Flask(__name__)

@app.route("/")
def notify():
    asyncio.ensure_future(abar("abar"), loop=loop)
    return "OK"

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)
    loop.run_forever()

我还尝试将阻塞调用放在单独的线程中。但是它仍然没有调用abar函数

import asyncio
from threading import Thread
from flask import Flask

async def abar(a):
    print(a)

app = Flask(__name__)

def start_worker(loop):
    asyncio.set_event_loop(loop)
    try:
        loop.run_forever()
    finally:
        loop.close()

worker_loop = asyncio.new_event_loop()
worker = Thread(target=start_worker, args=(worker_loop,))

@app.route("/")
def notify():
    asyncio.ensure_future(abar("abar"), loop=worker_loop)
    return "OK"

if __name__ == "__main__":
    worker.start()
    app.run(debug=False, use_reloader=False)

Tags: 函数runnamefromimportloopeventasyncio
3条回答

您的错误是在调用app.run()之后尝试运行asyncio事件循环。后者不返回,而是运行Flask开发服务器

事实上,这就是大多数WSGI设置的工作方式;主线程将忙于调度请求,或者Flask服务器作为WSGI服务器中的模块导入,并且您也不能在这里启动事件循环

相反,您必须在单独的线程中运行asyncio事件循环,然后通过^{}在该单独的线程中运行协程。请参阅文档中的Coroutines and Multithreading section了解这需要什么

下面是一个模块的实现,该模块将运行这样一个事件循环线程,并提供实用程序来计划在该循环中运行的协程:

import asyncio
import itertools
import threading

__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]

class EventLoopThread(threading.Thread):
    loop = None
    _count = itertools.count(0)

    def __init__(self):
        self.started = threading.Event()
        name = f"{type(self).__name__}-{next(self._count)}"
        super().__init__(name=name, daemon=True)

    def __repr__(self):
        loop, r, c, d = self.loop, False, True, False
        if loop is not None:
            r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
        return (
            f"<{type(self).__name__} {self.name} id={self.ident} "
            f"running={r} closed={c} debug={d}>"
        )

    def run(self):
        self.loop = loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.call_later(0, self.started.set)

        try:
            loop.run_forever()
        finally:
            try:
                shutdown_asyncgens = loop.shutdown_asyncgens()
            except AttributeError:
                pass
            else:
                loop.run_until_complete(shutdown_asyncgens)
            try:
                shutdown_executor = loop.shutdown_default_executor()
            except AttributeError:
                pass
            else:
                loop.run_until_complete(shutdown_executor)
            asyncio.set_event_loop(None)
            loop.close()

    def stop(self):
        loop, self.loop = self.loop, None
        if loop is None:
            return
        loop.call_soon_threadsafe(loop.stop)
        self.join()

_lock = threading.Lock()
_loop_thread = None

def get_event_loop():
    global _loop_thread

    if _loop_thread is None:
        with _lock:
            if _loop_thread is None:
                _loop_thread = EventLoopThread()
                _loop_thread.start()
                # give the thread up to a second to produce a loop
                _loop_thread.started.wait(1)

    return _loop_thread.loop

def stop_event_loop():
    global _loop_thread
    with _lock:
        if _loop_thread is not None:
            _loop_thread.stop()
            _loop_thread = None

def run_coroutine(coro):
    """Run the coroutine in the event loop running in a separate thread

    Returns a Future, call Future.result() to get the output

    """
    return asyncio.run_coroutine_threadsafe(coro, get_event_loop())

您可以使用此处定义的run_coroutine()函数来调度异步IO例程。使用返回的^{} instance控制协同程序:

  • 使用Future.result()获取结果。你可以给它一个超时;如果在超时时间内未产生任何结果,则协同程序将自动取消
  • 您可以使用.cancelled().running().done()方法查询协同程序的状态
  • 您可以向将来添加回调,当协同程序完成、取消或引发异常时将调用回调(请考虑这可能是从事件循环线程调用的,而不是从中调用run_coroutine()的线程)

对于您的特定示例,abar()不返回任何结果,您可以忽略返回的未来,如下所示:

@app.route("/")
def notify():
    run_coroutine(abar("abar"))
    return "OK"

请注意,在Python3.8之前的中,您不能使用在单独线程上运行的事件循环来创建子流程!请参阅我对Python3 Flask asyncio subprocess in route hangs的回答,了解Python 3.8 ThreadedChildWatcher类的后端口

对于你的问题,一个更简单的解决方案(在我有偏见的观点中)是从Flask切换到Quart。如果是这样,您的代码片段将简化为

import asyncio
from quart import Quart

async def abar(a):
    print(a)

app = Quart(__name__)

@app.route("/")
async def notify():
    await abar("abar")
    return "OK"

if __name__ == "__main__":
    app.run(debug=False)

如其他答案中所述,Flask应用程序运行被阻塞,并且不与asyncio循环交互。另一方面,Quart是基于asyncio构建的FlaskAPI,因此它应该按照您所期望的方式工作

作为更新,Flask Aiohttp不再是maintained

您可以将一些异步功能合并到Flask应用程序中,而无需将它们完全转换为asyncio

import asyncio
from flask import Flask

async def abar(a):
    print(a)

loop = asyncio.get_event_loop()
app = Flask(__name__)

@app.route("/")
def notify():
    loop.run_until_complete(abar("abar"))
    return "OK"

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)

这将阻止Flask响应,直到异步函数返回,但它仍然允许您做一些聪明的事情。我使用这个模式使用aiohttp并行执行了许多外部请求,当它们完成后,我又回到传统的flask中进行数据处理和模板呈现

import aiohttp
import asyncio
import async_timeout
from flask import Flask

loop = asyncio.get_event_loop()
app = Flask(__name__)

async def fetch(url):
    async with aiohttp.ClientSession() as session, async_timeout.timeout(10):
        async with session.get(url) as response:
            return await response.text()

def fight(responses):
    return "Why can't we all just get along?"

@app.route("/")
def index():
    # perform multiple async requests concurrently
    responses = loop.run_until_complete(asyncio.gather(
        fetch("https://google.com/"),
        fetch("https://bing.com/"),
        fetch("https://duckduckgo.com"),
        fetch("http://www.dogpile.com"),
    ))

    # do something with the results
    return fight(responses)

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)

相关问题 更多 >