Flask应用中后台线程记录到Server Sent Event流
我正在尝试构建一个Flask应用程序,这个应用程序需要在后台运行一些任务。这个任务(我们称之为工作者)使用标准的logging
模块来记录发生的事情。我想使用服务器推送事件(Server Sent Events)将日志信息直接推送到网页浏览器,但我无法通过gevent
来实现这个广播功能。
在下面的代码片段中,工作者启动得很正常,SSEHandler.emit
方法也按预期被调用,但在我执行gevent.spawn
之后,notify
函数似乎没有被执行。
main.py
import gevent
from gevent.wsgi import WSGIServer
from gevent.queue import Queue
from flask import Flask, Response
import time
import logging
import threading
from worker import Worker
class SSEHandler(logging.Handler):
def __init__(self):
logging.Handler.__init__(self)
self.subscriptions = []
def emit(self, record):
try:
msg = self.format(record)
print "sending", msg
def notify(subs, msg):
print "broadcasting!"
for sub in subs[:]:
sub.put(msg)
gevent.spawn(notify, self.subscriptions, msg)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def subscribe(self):
print "subscribed"
q = Queue()
self.subscriptions.append(q)
try:
while True:
result = q.get()
yield "data: %s\n\n"%result
except GeneratorExit: # Or maybe use flask signals
subscriptions.remove(q)
app = Flask(__name__)
handler = SSEHandler()
handler.setLevel(logging.DEBUG)
worker = None
# Client code consumes like this.
@app.route("/")
def index():
debug_template = """
<html>
<head>
</head>
<body>
<h1>Server sent events</h1>
<div id="event"></div>
<script type="text/javascript">
var eventOutputContainer = document.getElementById("event");
var evtSrc = new EventSource("/subscribe");
evtSrc.onmessage = function(e) {
console.log(e.data);
eventOutputContainer.innerHTML = e.data;
};
</script>
</body>
</html>
"""
return(debug_template)
@app.route("/subscribe")
def subscribe():
return Response(handler.subscribe(), mimetype="text/event-stream")
@app.route("/start")
def start():
def run():
global worker
global handler
worker = Worker(handler)
worker.go()
threading.Thread(target=run).start()
return "Going"
if __name__ == "__main__":
app.debug = True
server = WSGIServer(("", 5000), app)
server.serve_forever()
worker.py
import logging
import time
class Worker:
def __init__(self, handler):
self.log = logging.getLogger('sselog.worker.Worker')
self.log.setLevel(logging.DEBUG)
self.log.addHandler(handler)
self.log.info("Initialized")
def go(self):
i = 0
while True:
time.sleep(1)
self.log.info("I'm working so hard %u", i)
i+=1
1 个回答
0
问题在于我用了普通的线程和休眠,而不是使用gevent这个库。后来我把它改成了gevent的方式,或者应用了一个猴子补丁(monkey patch),结果一切都正常运作了。