如何将FastAPI依赖对象与asyncio一起使用?

2024-03-28 14:28:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个FastAPI端点,它需要从HDFS下载一些文件到本地服务器

我正在尝试使用asyncio运行函数,该函数将在单独的进程中下载文件

我正在使用FastAPI创建一个HDFS客户机,并在端点执行中注入对象

from fastapi import Depends, FastAPI, Request, Response, status
from hdfs import InsecureClient
import asyncio
from concurrent.futures.process import ProcessPoolExecutor

app = FastAPI()

HDFS_URLS = ['http://hdfs-srv.local:50070']

async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result

def connectHDFS():
    client = InsecureClient(url)
    yield client

def fr(id, img, client):
    # my code here

    client.download(id_identifica_foto_dir_hdfs,  id_identifica_foto_dir_local, True, n_threads=2)

    # my code here

    return jsonReturn


@app.post("/")
async def main(request: Request, hdfsclient: InsecureClient = Depends(connectHDFS)):

    # Decode the received message
    data = await request.json()
    message = base64.b64decode(data['data']).decode('utf-8').replace("'", '"')
    message = json.loads(message)

    res = await run_in_process(fr, message['id'], message['img'], hdfsclient)

    return {
        "message": res
    }

@app.on_event("startup")
async def on_startup():
    app.state.executor = ProcessPoolExecutor()

@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

但是我不能向前传递hdfsclient对象:

res = await run_in_process(fr, message['id'], message['img'], hdfsclient)

我得到以下错误:

Traceback (most recent call last):
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
    return await self.app(scope, receive, send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/fastapi/applications.py", line 199, in __call__
    await super().__call__(scope, receive, send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/applications.py", line 111, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/middleware/errors.py", line 181, in __call__
    raise exc from None
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/exceptions.py", line 82, in __call__
    raise exc from None
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/routing.py", line 566, in __call__
    await route.handle(scope, receive, send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/routing.py", line 227, in handle
    await self.app(scope, receive, send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/routing.py", line 41, in app
    response = await func(request)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/fastapi/routing.py", line 202, in app
    dependant=dependant, values=values, is_coroutine=is_coroutine
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/fastapi/routing.py", line 148, in run_endpoint_function
    return await dependant.call(**values)
  File "./asgi.py", line 86, in main
    res = await run_in_process(fr, message['id'], message['img'], hdfsclient)
  File "./asgi.py", line 22, in run_in_process
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result
  File "/usr/lib/python3.7/multiprocessing/queues.py", line 236, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

如何在def fr()函数中使用hdfsclient,而无需在每个新请求上创建新连接?我的意思是,如何在应用程序启动时创建hdfsclient,并能够在函数内部使用它


Tags: inpyappmessagehomelibpackagesline
1条回答
网友
1楼 · 发布于 2024-03-28 14:28:26

整个asyncio的要点就是在同一个过程中做你想做的事情。

典型的例子是一个web爬虫,您可以在同一线程/进程中打开多个请求,然后等待它们完成。这样,您就可以从多个urls获取数据,而无需在启动下一个请求之前等待每个请求

这同样适用于您的情况:调用下载文件的async函数,完成您的工作,然后等待文件下载完成(如果尚未完成)。在进程之间共享数据并不是一件小事,因此您的函数无法正常工作

我建议你先了解什么是async以及它是如何工作的,然后再开始做你不懂的事情

关于asyncio

https://www.datacamp.com/community/tutorials/asyncio-introduction

https://realpython.com/lessons/what-asyncio/

https://docs.python.org/3/library/asyncio.html

相关问题 更多 >