Flask流媒体时未停止的进程视图
我在用一个简单的Flask应用程序,配合gunicorn的gevent工作进程来处理服务器推送事件。
为了进行内容流传输,我使用了:
response = Response(eventstream(), mimetype="text/event-stream")
这个代码是从redis中流式传输事件:
def eventstream():
for message in pubsub.listen():
# ...
yield str(event)
我通过以下方式部署:
gunicorn -k gevent -b 127.0.0.1:50008 flaskapplication
但是在使用一段时间后,我发现有50个redis连接是打开的,即使没有人再连接到服务器推送事件流了。
看起来这个视图没有结束,因为gunicorn是非阻塞的,而pubsub.listen()是阻塞的。
我该怎么解决这个问题呢?我应该限制gunicorn可以生成的进程数量,还是应该让flask在一段时间后结束这个视图?如果可能的话,它应该在没有活动时停止视图/redis连接,而不影响那些仍然连接到SSE流的用户。
2 个回答
2
你可以用 gunicorn
加上 -t <秒数>
来设置一个超时时间,这样如果你的工作进程在设定的时间内没有反应,就会被强制关闭。通常设置30秒就够了。我觉得这个方法可能能解决你的问题,但我不太确定。
根据我看到的情况,你也可以考虑把你的工作进程改写一下,使用 gevent
里的 Timeout
。
这可能看起来像下面这样:
from gevent import Timeout
def eventstream():
pubsub = redis.pubsub()
try:
with Timeout(30) as timeout:
pubsub.subscribe(channel)
for message in pubsub.listen():
# ...
yield str(event)
except Timeout, t:
if t is not timeout:
raise
else:
pubsub.unsubscribe(channel)
这个例子对理解这个怎么运作很有帮助。
0
使用natdempk的解决方案中的Timeout
对象,最优雅的方法是发送一个心跳信号,以检测是否有连接断开:
while True:
pubsub = redis.pubsub()
try:
with Timeout(30) as timeout:
for message in pubsub.listen():
# ...
yield str(event)
timeout.cancel()
timeout.start()
except Timeout, t:
if t is not timeout:
raise
else:
yield ":\n\n" # heartbeat
请注意,你需要再次调用redis.pubsub()
,因为在出现异常后,redis连接会丢失,这时你会遇到一个错误,提示NoneType object has no attribute readline
。