Python如何接收数据并将其用作websocket服务器?

2024-04-19 07:16:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图构建的python应用程序面临一个问题:基本上,我有一个脚本,可以从Redis PUBSUB连接接收简单的json数据,我希望将这些数据作为websocket服务器提供给客户端。因此,基本上,每次我从redis连接接收到消息时,都必须使用WebSocket将该消息发送给客户端

这是我的基本代码:

我从redis pubsub连接接收数据的部分:

import json
import redis

redis_url = 'MY-URL'
channel = 'test'

connection = redis.StrictRedis.from_url(redis_url, decode_responses=True)

pubsub = connection.pubsub(ignore_subscribe_messages=False)
pubsub.subscribe(channel)


for item in pubsub.listen():
    message = item['data']

    if type(message) != int:
        message = json.loads(message)
        print(message)

这是一个简单的websocket服务器,我使用websockets

import asyncio
import websockets

async def main(websocket, path):
    while True:
        await websockets.send('Some data')

start_server = websockets.serve(main, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

我缺少合并这两部分代码的方法。有没有办法做到这一点


Tags: 数据代码import服务器redisasynciojson消息
1条回答
网友
1楼 · 发布于 2024-04-19 07:16:03

解决方案1

在本例中,我们为每个新的websocket连接请求注册新的侦听器

import json
import redis
import asyncio
import websockets

redis_url = 'redis://localhost:6379/0'
channel = 'test'

connection = redis.StrictRedis.from_url(redis_url, decode_responses=True)

pubsub = connection.pubsub(ignore_subscribe_messages=False)
pubsub.subscribe(channel)

async def main(websocket, path):
    for item in pubsub.listen():
        message = item['data']

        if type(message) != int:
            message = json.loads(message)
            print(message)
            await websocket.send(message)

start_server = websockets.serve(main, "localhost", 8765)

print("Started")
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

解决方案2

在这里,我们使用单列表器向多个客户端发送事件,这些客户端通过websocket注册,并向打开的websocket连接发送消息

import json
import redis
import gevent
from flask import Flask
from flask_sockets import Sockets

redis_url = 'redis://localhost:6379/0'
channel = 'test'

connection = redis.StrictRedis.from_url(redis_url, decode_responses=True)

class PubSubListener(object):
    def __init__(self):
        self.clients = []
        self.pubsub = connection.pubsub(ignore_subscribe_messages=False)
        self.pubsub.subscribe(**{channel: self.handler})
        self.thread = self.pubsub.run_in_thread(sleep_time=0.001)

    def register(self, client):
        self.clients.append(client)

    def handler(self, message):
        _message = message['data']

        if type(_message) != int:
            self.send(_message)

    def send(self, data):
        for client in self.clients:
            try:
                client.send(data)
            except Exception:
                self.clients.remove(client)

pslistener = PubSubListener()

app = Flask(__name__)
sockets = Sockets(app)

@sockets.route('/echo')
def echo_socket(ws):
    pslistener.register(ws)

    while not ws.closed:
        gevent.sleep(0.1)

@app.route('/')
def hello():
    return 'Hello World!'


if __name__ == "__main__":
    from gevent import pywsgi
    from geventwebsocket.handler import WebSocketHandler
    print("Started")
    server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
    server.serve_forever()

相关问题 更多 >