Flask流媒体时未停止的进程视图

5 投票
2 回答
1363 浏览
提问于 2025-04-18 06:42

我在用一个简单的Flask应用程序,配合gunicorn的gevent工作进程来处理服务器推送事件。

为了进行内容流传输,我使用了:

response = Response(eventstream(), mimetype="text/event-stream")

这个代码是从redis中流式传输事件:

def eventstream():
    for message in pubsub.listen():
        # ...
        yield str(event)

我通过以下方式部署:

gunicorn -k gevent -b 127.0.0.1:50008 flaskapplication

但是在使用一段时间后,我发现有50个redis连接是打开的,即使没有人再连接到服务器推送事件流了。

看起来这个视图没有结束,因为gunicorn是非阻塞的,而pubsub.listen()是阻塞的。

我该怎么解决这个问题呢?我应该限制gunicorn可以生成的进程数量,还是应该让flask在一段时间后结束这个视图?如果可能的话,它应该在没有活动时停止视图/redis连接,而不影响那些仍然连接到SSE流的用户。

2 个回答

2

你可以用 gunicorn 加上 -t <秒数> 来设置一个超时时间,这样如果你的工作进程在设定的时间内没有反应,就会被强制关闭。通常设置30秒就够了。我觉得这个方法可能能解决你的问题,但我不太确定。

根据我看到的情况,你也可以考虑把你的工作进程改写一下,使用 gevent 里的 Timeout

这可能看起来像下面这样:

from gevent import Timeout

def eventstream():
    pubsub = redis.pubsub()
    try:
        with Timeout(30) as timeout:
            pubsub.subscribe(channel)
            for message in pubsub.listen():
                # ...
                yield str(event)
    except Timeout, t:
        if t is not timeout:
            raise
        else:
            pubsub.unsubscribe(channel)

这个例子对理解这个怎么运作很有帮助。

0

使用natdempk的解决方案中的Timeout对象,最优雅的方法是发送一个心跳信号,以检测是否有连接断开:

while True:
    pubsub = redis.pubsub()
    try:
        with Timeout(30) as timeout:
            for message in pubsub.listen():
                # ...
                yield str(event)
                timeout.cancel()
                timeout.start()
    except Timeout, t:
        if t is not timeout:
            raise
        else:
            yield ":\n\n"  # heartbeat

请注意,你需要再次调用redis.pubsub(),因为在出现异常后,redis连接会丢失,这时你会遇到一个错误,提示NoneType object has no attribute readline

撰写回答