我试图从websockets文档中拼凑出一个基本的通用客户端接口。我使用this示例作为起点
下面的代码在一个循环中发送10个字符串,但它们要么没有从echo服务器返回,要么没有到达consumer
方法
我尝试用以下代码替换下面代码中的方法
async def consumer_handler(self, websocket, path):
try:
logger.debug("Beginning consumer")
while True:
try:
message = await asyncio.wait_for(websocket.recv(), timeout=1.0)
logger.debug(f"Recieved message {message}")
await self.consumer(message)
except asyncio.TimeoutError:
logger.debug("Consumer timeout")
finally:
logger.debug("Ended consumer")
。。。但这没什么区别。此外,不会打印“使用者超时”或“接收到的消息”日志,因此似乎在生产者\处理程序启动后,此循环没有运行(使用者\处理程序在记录“开始使用者”字符串时启动)。我还注意到,“终端消费者”和“终端生产者”字符串从未被记录
import asyncio
import websockets
from queue import Queue
import logging
from functools import partial
from concurrent.futures import ThreadPoolExecutor
import time
logging.basicConfig(format='%(asctime)-15s %(threadName)s %(message)s')
logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)
class Application:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=8)
def getExecutor(self):
return self.executor
def close(self):
self.executor.shutdown()
class Client:
###########################################################################
# Business Logic
def sendMessage(self, message):
self.messageQueue.put(message)
async def consumer(self, message):
logger.debug(f"Consumed message {message}")
###########################################################################
###########################################################################
# General
def __init__(self, app):
self.app = app
self.messageQueue = Queue()
self.ws = None
async def producer(self):
logger.debug("In producer")
return self.messageQueue.get()
def connect(self, uri):
self.app.getExecutor().submit(partial(self._connect, uri))
def _connect(self, uri):
asyncio.run(self.__connect(uri), debug=True)
async def __connect(self, uri):
logger.debug("Connecting")
self.ws = await websockets.connect(uri)
await self.handler(self.ws, uri)
def close(self):
self.app.getExecutor().submit(self._close)
def _close(self):
asyncio.run(self.__close(), debug=True)
async def __close(self):
logger.debug("Ending client")
await self.ws.close()
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# remaining part of class is from
# https://websockets.readthedocs.io/en/stable/intro.html#both
async def consumer_handler(self, websocket, path):
try:
logger.debug("Beginning consumer")
async for message in websocket:
logger.debug(f"Recieved message {message}")
await self.consumer(message)
finally:
logger.debug("Ended consumer")
async def producer_handler(self, websocket, path):
logger.debug("Beginning producer")
try:
while True:
message = await self.producer()
await websocket.send(message)
logger.debug(f"Sent: {message}")
finally:
logger.debug("Ended producer")
async def handler(self, websocket, path):
consumer_task = asyncio.create_task(
self.consumer_handler(websocket, path))
producer_task = asyncio.create_task(
self.producer_handler(websocket, path))
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
def main():
ADDRESS = "ws://echo.websocket.org/"
logger.debug("Beginning client")
app = Application()
try:
client = Client(app)
try:
client.connect(ADDRESS)
time.sleep(2)
for i in range(10):
client.sendMessage(f"Hello {i}")
time.sleep(0.5)
time.sleep(3)
finally:
client.close()
time.sleep(2)
finally:
app.close()
if __name__ == '__main__':
main()
问题是
producer_handler
任务没有出于某种原因放弃事件循环,尽管等待了websocket.send
(我认为这是预期的,我怀疑这对于遵循文档的任何人来说都是一个问题)。为了克服这个问题,我添加了一个asyncio.sleep
调用,它确实放弃了事件循环,允许重新输入consumer_handler
处理程序没有正确关闭也是线程的一个问题。我将
__connect
和close
函数更改为现在一切都如期而至
完整的工作示例
相关问题 更多 >
编程相关推荐