WebSocket读取数据并发送到多个客户端

2024-03-29 12:21:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在用python学习WebSocket。我目前正在将websockets库与asyncio一起使用。
我的需求如下:进行数据读取(可能这是一个耗时的例行程序),并在固定的时间段内将读取的数据发送到客户端。
我还需要服务器从客户端接收命令,并用接收到的每个命令更新所有客户端。
然后读取将异步运行,命令也将异步运行。 以下是我迄今为止构建的代码:

#!/usr/bin/env python

# WS server that sends messages at random intervals

import asyncio
import datetime
import random
import websockets
import json
import time
from threading import Thread # for threads

#initial value for variable
STATE = {"value": 0}

USERS = set()

MSG = ""

#routine that returns the value received from users
def state_event():
    return json.dumps({"type": "state", **STATE})

# routine that sends messages to users
async def notify_state():
    if USERS:  # asyncio.wait doesn't accept an empty list
        message = state_event()
        await asyncio.wait([user.send(message) for user in USERS])

#routine that counts Users and returns the amount
def users_event():
    return json.dumps({"type": "users", "count": len(USERS)})

#routine that updates user logins
async def notify_users():
    if USERS:  # asyncio.wait doesn't accept an empty list
        message = users_event()
        await asyncio.wait([user.send(message) for user in USERS])

# rotina que adiciona os users ao set e invoca o atualizador
async def register(websocket):
    USERS.add(websocket)
    await notify_users()

#routine that removes users from the set and invokes the updater
async def unregister(websocket):
    USERS.remove(websocket)
    await notify_users()

# Routine that will perform the time-consuming operation (I used print to delay)
def get_string_data():
    while True:
        for x in range(4000):
           y = str(x)
           print(x)
    return str(random.random() * 3)

routine that will take the date and time and transmitdef message_event():
    nowt = datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S')
    return json.dumps({"type": "message", "value" : nowt})

    
async def notify_message_event():
   if USERS:  # asyncio.wait doesn't accept an empty list
        message = message_event()
        await asyncio.wait([user.send(message) for user in USERS])

   
#rotina que envia os dados
async def time(websocket, path):
    await register(websocket)
    try:
     
        #I put the time-consuming routine on a thread to run independently and always
        t1 = Thread(target = get_string_data)
        t1.setDaemon(True)
        t1.start()

        while True:
            # routine below is never performed
            websocket.send(message_event())

            # These are performed
            await websocket.send(state_event())
            async for message in websocket:
                data = json.loads(message)
                print(data)
                if data["action"] == "minus":
                    STATE["value"] -= 1
                    await notify_state()
                elif data["action"] == "plus":
                    STATE["value"] += 1
                    await notify_state()
                else:
                    logging.error("unsupported event: {}", data)
       
    except websockets.exceptions.ConnectionClosed:
        print ('desconected')
    
    finally:
        await unregister(websocket)


# start server
start_server = websockets.serve(time, "192.168.0.14", 6789)


#main
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

正如我代码中的注释一样,将数据发送到客户端的例程不会执行,但其他例程正常工作。 我能做些什么来实现我的目标? 对不起,我的语法


Tags: theimporteventasynciomessageforasyncthat