在Django通道中,线程只能启动一次

2024-05-12 14:51:04 发布

您现在位置:Python中文网/ 问答频道 /正文

我创建了一个简单的Django通道消费者,它应该连接到外部源,检索数据并将其发送到客户端。因此,用户打开页面>;消费者连接到外部服务并获取数据>;数据被发送到websocket

这是我的密码:

import json
from channels.generic.websocket import WebsocketConsumer, AsyncConsumer, AsyncJsonWebsocketConsumer

from binance.client import Client
import json
from binance.websockets import BinanceSocketManager
import time
import asyncio

client = Client('', '')

trades = client.get_recent_trades(symbol='BNBBTC')
bm = BinanceSocketManager(client)
class EchoConsumer(AsyncJsonWebsocketConsumer):


    async def connect(self):
        await self.accept()
        await self.send_json('test')


        bm.start_trade_socket('BNBBTC', self.process_message)
        bm.start()


    def process_message(self, message):
        JSON1 = json.dumps(message)
        JSON2 = json.loads(JSON1)

        #define variables
        Rate = JSON2['p']
        Quantity = JSON2['q']
        Symbol = JSON2['s']
        Order = JSON2['m']

        asyncio.create_task(self.send_json(Rate))
        print(Rate)

当我打开一页时,此代码起作用;但是,如果我尝试使用新帐户打开新窗口,它将抛出以下错误:

File "C:\Users\User\Desktop\Heroku\github\master\myapp\consumers.py", line 54, in connect
    bm.start()
  File "C:\Users\User\lib\threading.py", line 843, in start
    raise RuntimeError("threads can only be started once")
  threads can only be started once

我不熟悉频道,所以这是一个noob问题,但我如何解决这个问题?我想做的是:用户打开页面并获取数据,另一个用户打开页面并获取数据;没有办法吗?还是我只是误解了Django频道和WebSocket的工作原理


Tags: 数据django用户fromimportselfclientjson
3条回答

这里start()开始线程的活动

每个线程对象最多应调用一次。您已将BinanceSocketManager的全局对象设置为“bm”

如果在同一线程对象上调用多次,它将始终引发运行时错误

请参考下面提到的代码,它可能会帮助您

from channels.generic.websocket import WebsocketConsumer, AsyncConsumer, AsyncJsonWebsocketConsumer

from binance.client import Client
import json
from binance.websockets import BinanceSocketManager
import time
import asyncio


class EchoConsumer(AsyncJsonWebsocketConsumer):
    client = Client('', '')

    trades = client.get_recent_trades(symbol='BNBBTC')
    bm = BinanceSocketManager(client)

    async def connect(self):
        await self.accept()
        await self.send_json('test')


        self.bm.start_trade_socket('BNBBTC', self.process_message)
        self.bm.start()


    def process_message(self, message):
        JSON1 = json.dumps(message)
        JSON2 = json.loads(JSON1)

        #define variables
        Rate = JSON2['p']
        Quantity = JSON2['q']
        Symbol = JSON2['s']
        Order = JSON2['m']

        asyncio.create_task(self.send_json(Rate))
        print(Rate)

你真的需要一个辅助线程吗

class EchoConsumer(AsyncJsonWebsocketConsumer):

    symbol = ''

    async def connect(self):
        self.symbol = 'BNBBTC'
        # or, more probably, retrieve the value for "symbol" from query_string
        # so the client can specify which symbol he's interested into:
        #    socket = new WebSocket("ws://.../?symbol=BNBBTC");
        await self.accept()

    def process_message(self, message):
        # PSEUDO-CODE BELOW !
        if self.symbol == message['symbol']:
            await self.send({
                'type': 'websocket.send',
                'text': json.dumps(message),
            })

为了获得额外的灵活性,您还可以接受客户端的所有符号列表,而不是:

//HTML
socket = new WebSocket("ws://.../?symbols=XXX,YYY,ZZZ");

然后(在消费者中):

class EchoConsumer(AsyncJsonWebsocketConsumer):

    symbols = []

    async def connect(self):
        # here we need to parse "?symbols=XXX,YYY,ZZZ" ...
        # the code below has been stolen from another project of mine and should be suitably adapted
        params = urllib.parse.parse_qs(self.scope.get('query_string', b'').decode('utf-8'))
        try:
            self.symbols = json.loads(params.get('symbols', ['[]'])[0])
        except:
            self.symbols = []

    def process_message(self, message):
        if message['symbol'] in self.symbols:
            ...

我不是Django开发人员,但是如果我理解正确,函数connect被调用了不止一次,并且bm.start引用了最有可能在bm.start_trade_socket(或connect中的其他地方)中生成的相同线程。总之,当bm.start被调用时,一个线程被启动,当它再次被调用时,您会得到那个错误

相关问题 更多 >