我有一个FastAPI端点,它需要从HDFS下载一些文件到本地服务器
我正在尝试使用asyncio运行函数,该函数将在单独的进程中下载文件
我正在使用FastAPI创建一个HDFS客户机,并在端点执行中注入对象
from fastapi import Depends, FastAPI, Request, Response, status
from hdfs import InsecureClient
import asyncio
from concurrent.futures.process import ProcessPoolExecutor
app = FastAPI()
HDFS_URLS = ['http://hdfs-srv.local:50070']
async def run_in_process(fn, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(app.state.executor, fn, *args) # wait and return result
def connectHDFS():
client = InsecureClient(url)
yield client
def fr(id, img, client):
# my code here
client.download(id_identifica_foto_dir_hdfs, id_identifica_foto_dir_local, True, n_threads=2)
# my code here
return jsonReturn
@app.post("/")
async def main(request: Request, hdfsclient: InsecureClient = Depends(connectHDFS)):
# Decode the received message
data = await request.json()
message = base64.b64decode(data['data']).decode('utf-8').replace("'", '"')
message = json.loads(message)
res = await run_in_process(fr, message['id'], message['img'], hdfsclient)
return {
"message": res
}
@app.on_event("startup")
async def on_startup():
app.state.executor = ProcessPoolExecutor()
@app.on_event("shutdown")
async def on_shutdown():
app.state.executor.shutdown()
但是我不能向前传递hdfsclient
对象:
res = await run_in_process(fr, message['id'], message['img'], hdfsclient)
我得到以下错误:
Traceback (most recent call last):
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
result = await app(self.scope, self.receive, self.send)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
return await self.app(scope, receive, send)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/fastapi/applications.py", line 199, in __call__
await super().__call__(scope, receive, send)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/applications.py", line 111, in __call__
await self.middleware_stack(scope, receive, send)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/middleware/errors.py", line 181, in __call__
raise exc from None
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/middleware/errors.py", line 159, in __call__
await self.app(scope, receive, _send)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/exceptions.py", line 82, in __call__
raise exc from None
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/exceptions.py", line 71, in __call__
await self.app(scope, receive, sender)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/routing.py", line 566, in __call__
await route.handle(scope, receive, send)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/routing.py", line 227, in handle
await self.app(scope, receive, send)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/routing.py", line 41, in app
response = await func(request)
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/fastapi/routing.py", line 202, in app
dependant=dependant, values=values, is_coroutine=is_coroutine
File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/fastapi/routing.py", line 148, in run_endpoint_function
return await dependant.call(**values)
File "./asgi.py", line 86, in main
res = await run_in_process(fr, message['id'], message['img'], hdfsclient)
File "./asgi.py", line 22, in run_in_process
return await loop.run_in_executor(app.state.executor, fn, *args) # wait and return result
File "/usr/lib/python3.7/multiprocessing/queues.py", line 236, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
如何在def fr()
函数中使用hdfsclient
,而无需在每个新请求上创建新连接?我的意思是,如何在应用程序启动时创建hdfsclient
,并能够在函数内部使用它
整个
asyncio
的要点就是在同一个过程中做你想做的事情。典型的例子是一个web爬虫,您可以在同一线程/进程中打开多个请求,然后等待它们完成。这样,您就可以从多个
urls
获取数据,而无需在启动下一个请求之前等待每个请求这同样适用于您的情况:调用下载文件的
async
函数,完成您的工作,然后等待文件下载完成(如果尚未完成)。在进程之间共享数据并不是一件小事,因此您的函数无法正常工作我建议你先了解什么是
async
以及它是如何工作的,然后再开始做你不懂的事情关于
asyncio
https://www.datacamp.com/community/tutorials/asyncio-introduction
https://realpython.com/lessons/what-asyncio/
https://docs.python.org/3/library/asyncio.html
相关问题 更多 >
编程相关推荐