我有这段代码
class RTMClient:
...
#not important code
...
async def connect(self, queue: asyncio.Queue):
"""
Connect to the websocket stream and iterate over the messages
dumping them in the Queue.
"""
ws_url=''#url aquisition amended to readability
try:
self._ws = await websockets.connect(ws_url)
while not self.is_closed:
msg = await self._ws.recv()
if msg is None:
break
await queue.put(json.loads(msg))
except asyncio.CancelledError:
pass
finally:
self._closed.set()
self._ws = None
我想为它写一个自动测试。 我打算做的是:
websockets.connect
返回模拟连接is_closed
设置为True
我的问题:我如何模仿websockets.connection
来实现步骤1-3?
我在想一个像这样的pytest设备
但我不知道如何才能以这种方式返回预定义的消息列表,而且必须有更好的方法来实现这一点。在
这不是一个完整的答案,但也许它会帮助你。在
我在测试rabbitmq消息发射时遇到了simlar问题。看起来这里最常见和最健壮的方法是创建},将{}连接到伪的{},并在测试中手动向该通道发送消息。在
Connection
,创建Emmiter
和{所以你创建了所有的对象并嘲笑它们的反应。下面是一个非常相似的示例(使用套接字而不是websockets,但可能仍然对您有用):mocking a socket connection in Python
相关问题 更多 >
编程相关推荐