实现WSGI流服务:(如何检测客户端断开连接)

5 投票
2 回答
1406 浏览
提问于 2025-04-17 07:38

我正在写一个WSGI流服务,利用一个被包装在迭代器中的队列来实现多播推送。接下来是这个服务的简化模型:

# this is managed by another thread
def processor_runner():
    generator = SerialMessageGenerator()
    for message in generator:
      for client in Processor.connections:
          client.put(message)

# this is managed by twisted's wsgi implementation
def main(environ, start_response):
    queue = Queue()
    Processor.connections.append(queue)
    status = '200 OK'
    response_headers = [
        ('Content-Type', 'application/json'),
        ('Transfer-Encoding', 'chunked')
    ]
    start_response(status, response_headers)
    return iter(queue.get, None)

这个服务在使用twisted作为WSGI服务器时运行得很好(顺便提一下,串行生成器是一个通过进程间队列连接到处理器的单独进程)。我想知道如何检测客户端何时断开连接,从而将其从队列中移除?我的想法是将队列和客户端的socket一起放入一个元组中,比如(socket, queue),然后在执行put操作之前检查socket是否仍然连接。不过,我不太确定从environ中该获取什么。有没有人有这方面的经验,可以在我胡乱拼凑之前给点建议?

更新

这是我最终选择的解决方案:

class IterableQueue(Queue):

def __init__(self):
    Queue.__init__(self) # Queue is an old style class
    ShellProcessor.connections.append(self)

def __iter__(self):
    return iter(self.get, None)

def close(self):
    self.put(None)
    self.task_done()
    ShellProcessor.connections.remove(self)

2 个回答

0

阅读:

http://groups.google.com/group/modwsgi/browse_frm/thread/8ebd9aca9d317ac9

这里有些内容是关于mod_wsgi的,但一般来说,任何WSGI服务器都会遇到类似的问题。

1

当请求完成或被中断时,twisted会对迭代器调用.close(),如果这个迭代器存在的话。你可以这样做:

# ...
start_response(status, response_headers)
return ResponseIterator(iter(queue.get, None),
     on_finish=lambda: Processor.connections.remove(queue))

其中ResponseIterator可以是:

class ResponseIterator:

  def __init__(self, iterator, on_finish=None):
      self.iterator = iterator
      self.on_finish = on_finish

  def __iter__(self):
      return self

  def next(self):
      return next(self.iterator)

  def close(self):
      if self.on_finish is not None:
         self.on_finish()

撰写回答