Pytest:测试websocket连接

2024-05-01 21:49:58 发布

您现在位置:Python中文网/ 问答频道 /正文

我有这段代码

class RTMClient:
    ...
    #not important code
    ...    
    async def connect(self, queue: asyncio.Queue):
        """
        Connect to the websocket stream and iterate over the messages
        dumping them in the Queue.
        """
        ws_url=''#url aquisition amended to readability
        try:
            self._ws = await websockets.connect(ws_url)
            while not self.is_closed:
                msg = await self._ws.recv()
                if msg is None:
                    break
                await queue.put(json.loads(msg))

        except asyncio.CancelledError:
            pass
        finally:
            self._closed.set()
            self._ws = None

我想为它写一个自动测试。 我打算做的是:

  1. Monkeypatchwebsockets.connect返回模拟连接
  2. 使模拟连接从预定义列表返回模拟消息
  3. 将模拟连接集is_closed设置为True
  4. 断言websocket连接已关闭
  5. 断言所有预定义的消息都在队列中

我的问题:我如何模仿websockets.connection来实现步骤1-3? 我在想一个像这样的pytest设备

^{pr2}$

但我不知道如何才能以这种方式返回预定义的消息列表,而且必须有更好的方法来实现这一点。在


Tags: thetoselfasyncio消息urlwsqueue
1条回答
网友
1楼 · 发布于 2024-05-01 21:49:58

这不是一个完整的答案,但也许它会帮助你。在

我在测试rabbitmq消息发射时遇到了simlar问题。看起来这里最常见和最健壮的方法是创建Connection,创建Emmiter和{},将{}连接到伪的{},并在测试中手动向该通道发送消息。在

所以你创建了所有的对象并嘲笑它们的反应。下面是一个非常相似的示例(使用套接字而不是websockets,但可能仍然对您有用):mocking a socket connection in Python

相关问题 更多 >