实现和测试WebSocket服务器连接超时
我正在用Tornado 3.2实现一个WebSockets服务器。连接到这个服务器的客户端不是浏览器。
在服务器和客户端之间需要来回通信的情况下,我想设置一个最大等待时间,也就是服务器在关闭连接之前会等客户端的响应多长时间。
我大概是这样尝试的:
import datetime
import tornado
class WSHandler(WebSocketHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.timeout = None
def _close_on_timeout(self):
if self.ws_connection:
self.close()
def open(self):
initialize()
def on_message(self, message):
# Remove previous timeout, if one exists.
if self.timeout:
tornado.ioloop.IOLoop.instance().remove_timeout(self.timeout)
self.timeout = None
if is_last_message:
self.write_message(message)
self.close()
else:
# Add a new timeout.
self.timeout = tornado.ioloop.IOLoop.instance().add_timeout(
datetime.timedelta(milliseconds=1000), self._close_on_timeout)
self.write_message(message)
我是不是太笨了,有没有更简单的方法来做到这一点?我甚至连通过上面的add_timeout安排一个简单的打印语句都做不到。
我还需要一些帮助来测试这个。到目前为止,我有这些:
from tornado.websocket import websocket_connect
from tornado.testing import AsyncHTTPTestCase, gen_test
import time
class WSTests(AsyncHTTPTestCase):
@gen_test
def test_long_response(self):
ws = yield websocket_connect('ws://address', io_loop=self.io_loop)
# First round trip.
ws.write_message('First message.')
result = yield ws.read_message()
self.assertEqual(result, 'First response.')
# Wait longer than the timeout.
# The test is in its own IOLoop, so a blocking sleep should be okay?
time.sleep(1.1)
# Expect either write or read to fail because of a closed socket.
ws.write_message('Second message.')
result = yield ws.read_message()
self.assertNotEqual(result, 'Second response.')
客户端在写入和读取socket时没有问题。这可能是因为add_timeout没有被触发。
测试是否需要以某种方式让出控制权,以便服务器上的超时回调能够运行?我本以为不需要,因为文档说测试是在它们自己的IOLoop中运行的。
编辑
这是根据Ben的建议修改后的可用版本。
import datetime
import tornado
class WSHandler(WebSocketHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.timeout = None
def _close_on_timeout(self):
if self.ws_connection:
self.close()
def open(self):
initialize()
def on_message(self, message):
# Remove previous timeout, if one exists.
if self.timeout:
tornado.ioloop.IOLoop.current().remove_timeout(self.timeout)
self.timeout = None
if is_last_message:
self.write_message(message)
self.close()
else:
# Add a new timeout.
self.timeout = tornado.ioloop.IOLoop.current().add_timeout(
datetime.timedelta(milliseconds=1000), self._close_on_timeout)
self.write_message(message)
测试:
from tornado.websocket import websocket_connect
from tornado.testing import AsyncHTTPTestCase, gen_test
import time
class WSTests(AsyncHTTPTestCase):
@gen_test
def test_long_response(self):
ws = yield websocket_connect('ws://address', io_loop=self.io_loop)
# First round trip.
ws.write_message('First message.')
result = yield ws.read_message()
self.assertEqual(result, 'First response.')
# Wait a little more than the timeout.
yield gen.Task(self.io_loop.add_timeout, datetime.timedelta(seconds=1.1))
# Expect either write or read to fail because of a closed socket.
ws.write_message('Second message.')
result = yield ws.read_message()
self.assertEqual(result, None)
2 个回答
嘿,Ben,我知道这个问题早就解决了,但我想和正在阅读这个内容的用户分享一下我找到的解决方案。这个方案基本上是基于你的,但它解决了一个来自外部服务的问题,这个服务可以很容易地通过组合的方式集成到任何websocket中,而不是使用继承。
class TimeoutWebSocketService():
_default_timeout_delta_ms = 10 * 60 * 1000 # 10 min
def __init__(self, websocket, ioloop=None, timeout=None):
# Timeout
self.ioloop = ioloop or tornado.ioloop.IOLoop.current()
self.websocket = websocket
self._timeout = None
self._timeout_delta_ms = timeout or TimeoutWebSocketService._default_timeout_delta_ms
def _close_on_timeout(self):
self._timeout = None
if self.websocket.ws_connection:
self.websocket.close()
def refresh_timeout(self, timeout=None):
timeout = timeout or self._timeout_delta_ms
if timeout > 0:
# Clean last timeout, if one exists
self.clean_timeout()
# Add a new timeout (must be None from clean).
self._timeout = self.ioloop.add_timeout(
datetime.timedelta(milliseconds=timeout), self._close_on_timeout)
def clean_timeout(self):
if self._timeout is not None:
# Remove previous timeout, if one exists.
self.ioloop.remove_timeout(self._timeout)
self._timeout = None
使用这个服务非常简单,只需要创建一个新的TimeoutWebService实例(可以选择性地设置超时时间,单位是毫秒,以及它应该执行的ioloop),然后调用方法“refresh_timeout”来设置超时时间,或者重置已经存在的超时时间,或者使用“clean_timeout”来停止超时服务。
class BaseWebSocketHandler(WebSocketHandler):
def prepare(self):
self.timeout_service = TimeoutWebSocketService(timeout=(1000*60))
## Optionally starts the service here
self.timeout_service.refresh_timeout()
## rest of prepare method
def on_message(self):
self.timeout_service.refresh_timeout()
def on_close(self):
self.timeout_service.clean_timeout()
通过这种方式,你可以控制确切的时间和条件,决定何时重启超时,这可能因应用而异。举个例子,你可能只想在用户满足某些条件时刷新超时,或者当消息是预期的内容时。
希望大家喜欢这个解决方案!
我觉得你第一个例子里的超时处理代码是没问题的。
在测试的时候,每个测试案例都有自己的IOLoop,但其实测试和其他运行的东西只用一个IOLoop,所以在这里你也得用add_timeout,而不是time.sleep(),这样才能避免让服务器卡住。