实现和测试WebSocket服务器连接超时

3 投票
2 回答
5294 浏览
提问于 2025-04-18 08:11

我正在用Tornado 3.2实现一个WebSockets服务器。连接到这个服务器的客户端不是浏览器。

在服务器和客户端之间需要来回通信的情况下,我想设置一个最大等待时间,也就是服务器在关闭连接之前会等客户端的响应多长时间。

我大概是这样尝试的:

import datetime
import tornado

class WSHandler(WebSocketHandler):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.timeout = None

    def _close_on_timeout(self):
        if self.ws_connection:
            self.close()

    def open(self):
        initialize()

    def on_message(self, message):
        # Remove previous timeout, if one exists.
        if self.timeout:
            tornado.ioloop.IOLoop.instance().remove_timeout(self.timeout)
            self.timeout = None

        if is_last_message:
            self.write_message(message)
            self.close()
        else:
            # Add a new timeout.
            self.timeout = tornado.ioloop.IOLoop.instance().add_timeout(
                datetime.timedelta(milliseconds=1000), self._close_on_timeout)
            self.write_message(message)

我是不是太笨了,有没有更简单的方法来做到这一点?我甚至连通过上面的add_timeout安排一个简单的打印语句都做不到。

我还需要一些帮助来测试这个。到目前为止,我有这些:

from tornado.websocket import websocket_connect
from tornado.testing import AsyncHTTPTestCase, gen_test
import time

class WSTests(AsyncHTTPTestCase):

    @gen_test
    def test_long_response(self):
        ws = yield websocket_connect('ws://address', io_loop=self.io_loop)

        # First round trip.
        ws.write_message('First message.')
        result = yield ws.read_message()
        self.assertEqual(result, 'First response.')

        # Wait longer than the timeout.
        # The test is in its own IOLoop, so a blocking sleep should be okay?
        time.sleep(1.1)

        # Expect either write or read to fail because of a closed socket.
        ws.write_message('Second message.')
        result = yield ws.read_message()

        self.assertNotEqual(result, 'Second response.')

客户端在写入和读取socket时没有问题。这可能是因为add_timeout没有被触发。

测试是否需要以某种方式让出控制权,以便服务器上的超时回调能够运行?我本以为不需要,因为文档说测试是在它们自己的IOLoop中运行的。

编辑

这是根据Ben的建议修改后的可用版本。

import datetime
import tornado

class WSHandler(WebSocketHandler):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.timeout = None

    def _close_on_timeout(self):
        if self.ws_connection:
            self.close()

    def open(self):
        initialize()

    def on_message(self, message):
        # Remove previous timeout, if one exists.
        if self.timeout:
            tornado.ioloop.IOLoop.current().remove_timeout(self.timeout)
            self.timeout = None

        if is_last_message:
            self.write_message(message)
            self.close()
        else:
            # Add a new timeout.
            self.timeout = tornado.ioloop.IOLoop.current().add_timeout(
                datetime.timedelta(milliseconds=1000), self._close_on_timeout)
            self.write_message(message)

测试:

from tornado.websocket import websocket_connect
from tornado.testing import AsyncHTTPTestCase, gen_test
import time

class WSTests(AsyncHTTPTestCase):

    @gen_test
    def test_long_response(self):
        ws = yield websocket_connect('ws://address', io_loop=self.io_loop)

        # First round trip.
        ws.write_message('First message.')
        result = yield ws.read_message()
        self.assertEqual(result, 'First response.')

        # Wait a little more than the timeout.
        yield gen.Task(self.io_loop.add_timeout, datetime.timedelta(seconds=1.1))

        # Expect either write or read to fail because of a closed socket.
        ws.write_message('Second message.')
        result = yield ws.read_message()
        self.assertEqual(result, None)

2 个回答

2

嘿,Ben,我知道这个问题早就解决了,但我想和正在阅读这个内容的用户分享一下我找到的解决方案。这个方案基本上是基于你的,但它解决了一个来自外部服务的问题,这个服务可以很容易地通过组合的方式集成到任何websocket中,而不是使用继承。

class TimeoutWebSocketService():
    _default_timeout_delta_ms = 10 * 60 * 1000  # 10 min

    def __init__(self, websocket, ioloop=None, timeout=None):
        # Timeout
        self.ioloop = ioloop or tornado.ioloop.IOLoop.current()
        self.websocket = websocket
        self._timeout = None
        self._timeout_delta_ms = timeout or TimeoutWebSocketService._default_timeout_delta_ms

    def _close_on_timeout(self):
        self._timeout = None
        if self.websocket.ws_connection:
            self.websocket.close()

    def refresh_timeout(self, timeout=None):
        timeout = timeout or self._timeout_delta_ms
        if timeout > 0:
            # Clean last timeout, if one exists
            self.clean_timeout()

            # Add a new timeout (must be None from clean).
            self._timeout = self.ioloop.add_timeout(
                datetime.timedelta(milliseconds=timeout), self._close_on_timeout)

    def clean_timeout(self):
        if self._timeout is not None:
            # Remove previous timeout, if one exists.
            self.ioloop.remove_timeout(self._timeout)
            self._timeout = None

使用这个服务非常简单,只需要创建一个新的TimeoutWebService实例(可以选择性地设置超时时间,单位是毫秒,以及它应该执行的ioloop),然后调用方法“refresh_timeout”来设置超时时间,或者重置已经存在的超时时间,或者使用“clean_timeout”来停止超时服务。

class BaseWebSocketHandler(WebSocketHandler):
    def prepare(self):
        self.timeout_service = TimeoutWebSocketService(timeout=(1000*60))

        ## Optionally starts the service here 
        self.timeout_service.refresh_timeout()

        ## rest of prepare method 

    def on_message(self):
        self.timeout_service.refresh_timeout()

    def on_close(self):
        self.timeout_service.clean_timeout()

通过这种方式,你可以控制确切的时间和条件,决定何时重启超时,这可能因应用而异。举个例子,你可能只想在用户满足某些条件时刷新超时,或者当消息是预期的内容时。

希望大家喜欢这个解决方案!

2

我觉得你第一个例子里的超时处理代码是没问题的。

在测试的时候,每个测试案例都有自己的IOLoop,但其实测试和其他运行的东西只用一个IOLoop,所以在这里你也得用add_timeout,而不是time.sleep(),这样才能避免让服务器卡住。

撰写回答