Python - 在同步方法中等待WebSocket响应

1 投票
1 回答
36 浏览
提问于 2025-04-12 18:06

我遇到了一个关于Asyncio、网络套接字和同步调用的问题。

我们有一个使用网络套接字和Flask的应用程序。

网络套接字是通过asyncio来管理的,我们在这里接收消息:

async def on_message(message):
   ** some logic
   await doStuff(message)

问题是我们的工作流程是这样的:我们有一个Flask的接口,需要执行某个操作,这个操作需要向网络套接字服务器发送请求,等待网络套接字的响应,然后再把同步的响应发送给控制器。

大概是这样的:

@app.route("/request", methods=["POST"])
def manageRequest():
data = request.get_json()

## send data to ws
ws.send(data)

## we need the response on the on_message method

response = {} ##ws response
makeSomething(response)

return newResponse

有没有办法在这个方法中等待异步响应,就像Java中的Completable那样?

1 个回答

0

有没有办法在方法中等待异步响应?

没有简单的方法可以做到这一点,特别是如果你想用一个简单的关键词来实现的话。所有的协程必须在事件循环中运行,只有协程才能使用 await 这个关键词。

asyncio.run 会在事件循环中运行你传入的协程,并在完成后返回结果,但它主要是用作整个程序或整个线程的入口点。调用它时会阻塞当前线程,直到协程完全完成,而且它还会为事件循环做一些设置和清理工作,所以用起来比较繁琐。

与事件循环交互的主要方法是使用 asyncio.run_coroutine_threadsafe。注意,这需要你有对 asyncio 事件循环的引用。我对 Flask 不太熟悉,所以听起来你有一个线程在运行 WebSocket 服务器的事件循环。你需要获取那个循环的引用。

@app.route("/request", methods=["POST"])
def manageRequest():
    data = request.get_json()

    ## send data to ws
    # Call your coroutine on the running event loop. Have the
    # coroutine return the response with `return ...`
    future = asyncio.run_coroutine.threadsafe(ws.send(data), loop)

    ## we need the response on the on_message method

    # Now, wait on the future's result to get the coroutine's
    # returned result. You can use a timeout if you would like.
    response = future.result() ##ws response
    newResponse = makeSomething(response)

    return newResponse

还有一种方法,如果你的协程和 WebSocket 服务器的设置不希望协程直接返回响应值,你 可以 使用标准的 queue.Queue 来从事件循环中获取数据。

@app.route("/request", methods=["POST"])
def manageRequest():
    data = request.get_json()

    ## send data to ws
    # create a one-element queue for the response
    response_queue = queue.Queue(maxsize=1)
    # Call your coroutine on the running event loop and also pass in
    # your response queue for the reply. Inside the coroutine, process
    # the call, and then place the response on the passed in queue
    # using `Queue.put_nowait`.
    asyncio.run_coroutine.threadsafe(ws.send(data, response_queue), loop)

    ## we need the response on the on_message method

    # Now, wait on the queue to get filled with the response.
    # You could also use a timeout if you'd like.
    response = response.queue.get() ##ws response
    newResponse = makeSomething(response)

return newResponse

撰写回答