Python - 在同步方法中等待WebSocket响应
我遇到了一个关于Asyncio、网络套接字和同步调用的问题。
我们有一个使用网络套接字和Flask的应用程序。
网络套接字是通过asyncio来管理的,我们在这里接收消息:
async def on_message(message):
** some logic
await doStuff(message)
问题是我们的工作流程是这样的:我们有一个Flask的接口,需要执行某个操作,这个操作需要向网络套接字服务器发送请求,等待网络套接字的响应,然后再把同步的响应发送给控制器。
大概是这样的:
@app.route("/request", methods=["POST"])
def manageRequest():
data = request.get_json()
## send data to ws
ws.send(data)
## we need the response on the on_message method
response = {} ##ws response
makeSomething(response)
return newResponse
有没有办法在这个方法中等待异步响应,就像Java中的Completable那样?
1 个回答
0
有没有办法在方法中等待异步响应?
没有简单的方法可以做到这一点,特别是如果你想用一个简单的关键词来实现的话。所有的协程必须在事件循环中运行,只有协程才能使用 await
这个关键词。
asyncio.run
会在事件循环中运行你传入的协程,并在完成后返回结果,但它主要是用作整个程序或整个线程的入口点。调用它时会阻塞当前线程,直到协程完全完成,而且它还会为事件循环做一些设置和清理工作,所以用起来比较繁琐。
与事件循环交互的主要方法是使用 asyncio.run_coroutine_threadsafe
。注意,这需要你有对 asyncio
事件循环的引用。我对 Flask 不太熟悉,所以听起来你有一个线程在运行 WebSocket 服务器的事件循环。你需要获取那个循环的引用。
@app.route("/request", methods=["POST"])
def manageRequest():
data = request.get_json()
## send data to ws
# Call your coroutine on the running event loop. Have the
# coroutine return the response with `return ...`
future = asyncio.run_coroutine.threadsafe(ws.send(data), loop)
## we need the response on the on_message method
# Now, wait on the future's result to get the coroutine's
# returned result. You can use a timeout if you would like.
response = future.result() ##ws response
newResponse = makeSomething(response)
return newResponse
还有一种方法,如果你的协程和 WebSocket 服务器的设置不希望协程直接返回响应值,你 可以 使用标准的 queue.Queue
来从事件循环中获取数据。
@app.route("/request", methods=["POST"])
def manageRequest():
data = request.get_json()
## send data to ws
# create a one-element queue for the response
response_queue = queue.Queue(maxsize=1)
# Call your coroutine on the running event loop and also pass in
# your response queue for the reply. Inside the coroutine, process
# the call, and then place the response on the passed in queue
# using `Queue.put_nowait`.
asyncio.run_coroutine.threadsafe(ws.send(data, response_queue), loop)
## we need the response on the on_message method
# Now, wait on the queue to get filled with the response.
# You could also use a timeout if you'd like.
response = response.queue.get() ##ws response
newResponse = makeSomething(response)
return newResponse