实现WSGI流服务:(如何检测客户端断开连接)
我正在写一个WSGI流服务,利用一个被包装在迭代器中的队列来实现多播推送。接下来是这个服务的简化模型:
# this is managed by another thread
def processor_runner():
generator = SerialMessageGenerator()
for message in generator:
for client in Processor.connections:
client.put(message)
# this is managed by twisted's wsgi implementation
def main(environ, start_response):
queue = Queue()
Processor.connections.append(queue)
status = '200 OK'
response_headers = [
('Content-Type', 'application/json'),
('Transfer-Encoding', 'chunked')
]
start_response(status, response_headers)
return iter(queue.get, None)
这个服务在使用twisted作为WSGI服务器时运行得很好(顺便提一下,串行生成器是一个通过进程间队列连接到处理器的单独进程)。我想知道如何检测客户端何时断开连接,从而将其从队列中移除?我的想法是将队列和客户端的socket一起放入一个元组中,比如(socket, queue),然后在执行put操作之前检查socket是否仍然连接。不过,我不太确定从environ中该获取什么。有没有人有这方面的经验,可以在我胡乱拼凑之前给点建议?
更新
这是我最终选择的解决方案:
class IterableQueue(Queue):
def __init__(self):
Queue.__init__(self) # Queue is an old style class
ShellProcessor.connections.append(self)
def __iter__(self):
return iter(self.get, None)
def close(self):
self.put(None)
self.task_done()
ShellProcessor.connections.remove(self)
2 个回答
0
阅读:
http://groups.google.com/group/modwsgi/browse_frm/thread/8ebd9aca9d317ac9
这里有些内容是关于mod_wsgi的,但一般来说,任何WSGI服务器都会遇到类似的问题。
1
当请求完成或被中断时,twisted会对迭代器调用.close()
,如果这个迭代器存在的话。你可以这样做:
# ...
start_response(status, response_headers)
return ResponseIterator(iter(queue.get, None),
on_finish=lambda: Processor.connections.remove(queue))
其中ResponseIterator
可以是:
class ResponseIterator:
def __init__(self, iterator, on_finish=None):
self.iterator = iterator
self.on_finish = on_finish
def __iter__(self):
return self
def next(self):
return next(self.iterator)
def close(self):
if self.on_finish is not None:
self.on_finish()