python中使用websocket库和asyncio的通用websocket客户端

2024-04-20 05:10:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图从websockets文档中拼凑出一个基本的通用客户端接口。我使用this示例作为起点

下面的代码在一个循环中发送10个字符串,但它们要么没有从echo服务器返回,要么没有到达consumer方法

我尝试用以下代码替换下面代码中的方法

async def consumer_handler(self, websocket, path):
    try:
        logger.debug("Beginning consumer")
        while True:
            try:
                message = await asyncio.wait_for(websocket.recv(), timeout=1.0)
                logger.debug(f"Recieved message {message}")
                await self.consumer(message)
            except asyncio.TimeoutError:
                logger.debug("Consumer timeout")
    finally:
        logger.debug("Ended consumer")

。。。但这没什么区别。此外,不会打印“使用者超时”或“接收到的消息”日志,因此似乎在生产者\处理程序启动后,此循环没有运行(使用者\处理程序在记录“开始使用者”字符串时启动)。我还注意到,“终端消费者”和“终端生产者”字符串从未被记录

import asyncio
import websockets
from queue import Queue
import logging
from functools import partial
from concurrent.futures import ThreadPoolExecutor
import time

logging.basicConfig(format='%(asctime)-15s %(threadName)s %(message)s')
logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)

class Application:
    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=8)

    def getExecutor(self):
        return self.executor
    
    def close(self):
        self.executor.shutdown()


class Client:
    
    ###########################################################################
    # Business Logic
        
    def sendMessage(self, message):
        self.messageQueue.put(message)
    
    async def consumer(self, message):
        logger.debug(f"Consumed message {message}")
        
    ###########################################################################
    
    ###########################################################################
    # General
    
    def __init__(self, app):
        
        self.app = app
        
        self.messageQueue = Queue()
        self.ws = None
    
    async def producer(self):
        logger.debug("In producer")
        return self.messageQueue.get()
    
    
    def connect(self, uri):
        self.app.getExecutor().submit(partial(self._connect, uri))
    
    def _connect(self, uri):
        asyncio.run(self.__connect(uri), debug=True)
    
    async def __connect(self, uri):
        logger.debug("Connecting")
        self.ws = await websockets.connect(uri)  
        await self.handler(self.ws, uri)
    
    def close(self):
         self.app.getExecutor().submit(self._close)
         
    def _close(self):
        asyncio.run(self.__close(), debug=True)
    
    async def __close(self):
        logger.debug("Ending client")
        await self.ws.close()

    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    # remaining part of class is from 
    # https://websockets.readthedocs.io/en/stable/intro.html#both

    async def consumer_handler(self, websocket, path):
        try:
            logger.debug("Beginning consumer")
            async for message in websocket:
                logger.debug(f"Recieved message {message}")
                await self.consumer(message)
        finally:
            logger.debug("Ended consumer")

            
    async def producer_handler(self, websocket, path):
        logger.debug("Beginning producer")
        try:
            while True:
                message = await self.producer()
                await websocket.send(message)
                logger.debug(f"Sent: {message}")
        finally:
            logger.debug("Ended producer")
            
    
    async def handler(self, websocket, path):
        consumer_task = asyncio.create_task(
            self.consumer_handler(websocket, path))
        producer_task = asyncio.create_task(
            self.producer_handler(websocket, path))
        done, pending = await asyncio.wait(
            [consumer_task, producer_task],
            return_when=asyncio.FIRST_COMPLETED,
        )
        for task in pending:
            task.cancel()
        
def main():
    
    ADDRESS = "ws://echo.websocket.org/"
    
    logger.debug("Beginning client")
    
    app = Application()
    try:
        client = Client(app)
        try:
            client.connect(ADDRESS)
        
            time.sleep(2)
            
            for i in range(10):
                client.sendMessage(f"Hello {i}")
                time.sleep(0.5)
            
            time.sleep(3)
            
        finally:
            client.close()
            time.sleep(2)
    finally:
        app.close()

    
if __name__ == '__main__':
    main()

Tags: producerdebugselfasyncioappmessagetaskclose
1条回答
网友
1楼 · 发布于 2024-04-20 05:10:06

问题是producer_handler任务没有出于某种原因放弃事件循环,尽管等待了websocket.send(我认为这是预期的,我怀疑这对于遵循文档的任何人来说都是一个问题)。为了克服这个问题,我添加了一个asyncio.sleep调用,它确实放弃了事件循环,允许重新输入consumer_handler

async def producer_handler(self, websocket, path):
    logger.debug("Beginning producer")
    try:
        while True:
            message = await self.producer()
            await websocket.send(message)
            logger.debug(f"Sent: {message}")
            
            # The fix! This gives up the event loop so that consumer_handler can be revisited
            await asyncio.sleep(1)
    finally:
        logger.debug("Ended producer")

处理程序没有正确关闭也是线程的一个问题。我将__connectclose函数更改为

async def __connect(self, uri):
    self.loop = asyncio.get_event_loop()
    logger.debug("Connecting")
    self.ws = await websockets.connect(uri)  
    await self.handler(self.ws, uri)

def close(self):
    asyncio.run_coroutine_threadsafe(self.__close(), self.loop)

现在一切都如期而至

完整的工作示例

import asyncio
import websockets
from queue import Queue
import logging
from functools import partial
from concurrent.futures import ThreadPoolExecutor
import time

logging.basicConfig(format='%(asctime)-15s %(threadName)s %(message)s')
logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)

class Application:
    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=8)

    def getExecutor(self):
        return self.executor
    
    def close(self):
        self.executor.shutdown()


class Client:
    
    ###########################################################################
    # Business Logic
        
    def sendMessage(self, message):
        self.messageQueue.put(message)
    
    async def consumer(self, message):
        logger.debug(f"Consumed message {message}")
        
    ###########################################################################
    
    ###########################################################################
    # General
    
    def __init__(self, app):
        
        self.app = app
        
        self.messageQueue = Queue()
        self.ws = None
    
    async def producer(self):
        logger.debug("In producer")
        return self.messageQueue.get()
    
    
    def connect(self, uri):
        self.app.getExecutor().submit(partial(self._connect, uri))
    
    def _connect(self, uri):
        asyncio.run(self.__connect(uri), debug=True)
    
    async def __connect(self, uri):
        self.loop = asyncio.get_event_loop()
        logger.debug("Connecting")
        self.ws = await websockets.connect(uri)  
        await self.handler(self.ws, uri)
    
    def close(self):
        asyncio.run_coroutine_threadsafe(self.__close(), self.loop)

    async def __close(self):
        logger.debug("Ending client")
        await self.ws.close()

    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    # remaining part of class is from 
    # https://websockets.readthedocs.io/en/stable/intro.html#both

    async def consumer_handler(self, websocket, path):
        try:
            logger.debug("Beginning consumer")
            async for message in websocket:
                logger.debug(f"Recieved message {message}")
                await self.consumer(message)
        finally:
            logger.debug("Ended consumer")

            
    async def producer_handler(self, websocket, path):
        logger.debug("Beginning producer")
        try:
            while True:
                message = await self.producer()
                await websocket.send(message)
                logger.debug(f"Sent: {message}")
                await asyncio.sleep(1)
        finally:
            logger.debug("Ended producer")
            

    async def handler(self, websocket, path):
        '''This simply schedules the sequential execution of the producer_task
        and consumer_task. It does NOT run them in parallel!'''
        producer_task = asyncio.ensure_future(
            self.producer_handler(websocket, path))
        consumer_task = asyncio.ensure_future(
            self.consumer_handler(websocket, path))
        done, pending = await asyncio.wait(
            [consumer_task, producer_task],
            return_when=asyncio.FIRST_COMPLETED,
        )
        for task in pending:
            task.cancel()

def main():
    
    ADDRESS = "ws://echo.websocket.org/"
    
    logger.debug("Beginning client")
    
    app = Application()
    try:
        client = Client(app)
        try:
            client.connect(ADDRESS)
        
            time.sleep(2)
            
            for i in range(10):
                client.sendMessage(f"Hello {i}")
                time.sleep(0.5)

            time.sleep(1)
            
        finally:
            client.close()
            time.sleep(3)
    finally:
        app.close()

    
if __name__ == '__main__':
    main()

相关问题 更多 >