如何在调用更新后端的函数时从python(fastapi)发送服务器端事件

2021-04-11 16:10:19 发布

您现在位置:Python中文网/ 问答频道 /正文

我有以下问题:给定一个运行fastapi的后端,它有一个流式端点,用于更新前端,我想在每次调用更新后端状态的函数时发送这些更新(这可能是由计划的作业引起的,也可能是被命中并导致状态更新的另一个端点)。在

我希望实现的简化版本是:

from fastapi import FastAPI
from starlette.responses import StreamingResponse

class State:
    def __init__(self):
        self.messages = []

    def update(self, new_messages):
        self.messages = new_messages
        # HERE: notify waiting stream endpoint

app = FastAPI()

state = State()

@app.get('/stream')
def stream():
    def event_stream():
        while True:
            # HERE lies the question: wait for state to be update
            for message in state.messages:
                yield 'data: {}\n\n'.format(json.dumps(message))
    return StreamingResponse(event_stream(), media_type="text/event-stream")

我想让它永远保持运行。每当状态更新时,event_stream就会取消阻止并发送消息。在

我看了一个线程和异步,但是我觉得我缺少了一些在python中如何做到这一点的简单概念。在

1条回答
网友
1楼 ·

我能找到的解决这个问题的最简单的方法是使用threading.Condition。在

因此它变成:

import threading

from fastapi import FastAPI
from starlette.responses import StreamingResponse

condition = threading.Condition()

class State:
    def __init__(self):
        self.messages = []

    def update(self, new_messages):
        self.messages = new_messages
        with condition:
            condition.notify()

app = FastAPI()

state = State()

@app.get('/stream')
def stream():
    def event_stream():
        while True:
            with condition:
                condition.wait()

            for message in state.messages:
                yield 'data: {}\n\n'.format(json.dumps(message))
    return StreamingResponse(event_stream(), media_type="text/event-stream")




相关问题