Websockets消息仅在末尾发送,而不在使用async/await的实例中发送,产生嵌套的for循环

2024-06-09 05:52:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个计算量很大的过程,需要几分钟才能在服务器上完成。所以我想通过WebSocket将每次迭代的结果发送给客户机

整个应用程序都可以工作,但我的问题是,在整个模拟完成后,所有消息都以一大块的形式到达客户端。我在这里肯定遗漏了什么,因为我希望await websocket.send_json()在这个过程中发送消息,而不是在最后发送所有消息

服务器python(FastAPI)

# A very simplified abstraction of the actual app.

def simulate_intervals(data):
  for t in range(data.n_intervals):
    state = interval(data) # returns a JAX NumPy array
    yield state

def simulate(data):
  for key in range(data.n_trials):
     trial = simulate_intervals(data)
     yield trial

@app.websocket("/ws")
async def socket(websocket: WebSocket):

  await websocket.accept()
  while True:
    # Get model inputs from client
    data = await websocket.receive_text()
    # Minimal computation
    nodes = distributions(data)

    nodosJson = json.dumps(nodes, cls=NumpyEncoder)
    # I expect this message to be sent early on,
    # but the client gets it at the end with all the other messages. 
    await websocket.send_json({"tipo": "nodos", "datos": json.loads(nodosJson)})
    
    # Heavy computation
    trials = simulate(data)

    for trialI, trial in enumerate(trials):
      for stateI, state in enumerate(trial):
        stateString = json.dumps(state, cls=NumpyEncoder)

        await websocket.send_json(
          {
            "tipo": "estado",
            "datos": json.loads(stateString),
            "trialI": trialI,
            "stateI": stateI,
          }
        )

    await websocket.send_json({"tipo": "estado", "msg": "fin"})

为了完整起见,这里是基本的客户机代码

客户

const ws = new WebSocket('ws://localhost:8000/ws');

ws.onopen = () => {
  console.log('Conexión exitosa');
};

ws.onmessage = (e) => {
  const mensaje = JSON.parse(e.data);
  console.log(mensaje);
};

botonEnviarDatos.onclick = () => {
   ws.send(JSON.stringify({...}));
}

Tags: theinsendjson消息fordataws
1条回答
网友
1楼 · 发布于 2024-06-09 05:52:41

我无法让它像我在问题中所说的那样工作,仍然有兴趣听取任何人的意见,他们理解为什么不可能在不阻止的情况下发送多个异步消息

对于任何感兴趣的人,以下是我当前的解决方案:

来自客户端和服务器的乒乓消息

我改变了逻辑,使服务器和客户端不断地相互发送消息,而不是试图在来自客户端的单个请求中传输数据

这实际上比我最初的尝试要好得多,因为我可以检测套接字何时断开连接并停止服务器中的处理。基本上,如果客户端断开连接,就不会从该客户端发送新的数据请求,服务器也不会继续进行繁重的计算

服务器

# A very simplified abstraction of the actual app.

def simulate_intervals(data):
  for t in range(data.n_intervals):
    state = interval(data) # returns a JAX NumPy array
    yield state

def simulate(data):
  for key in range(data.n_trials):
     trial = simulate_intervals(data)
     yield trial

@app.websocket("/ws")
async def socket(websocket: WebSocket):

  await websocket.accept()
  while True:
    # Get messages from client
    data = await websocket.receive_text()
    
    # "tipo" is basically the type of data being sent from client or server to the other one.
    # In this case, "tipo": "inicio" is the client sending inputs and requesting for a certain data in response.
    if data["tipo"] == "inicio":
      # Minimal computation
      nodes = distributions(data)

      nodosJson = json.dumps(nodes, cls=NumpyEncoder)
      # In this first interaction, the client gets the first message without delay. 
      await websocket.send_json({"tipo": "nodos", "datos": json.loads(nodosJson)})

      # Since this is a generator (def returns yield) it does not actually
      # trigger that actual computationally heavy process. 
      trials = simulate(data)
      
      # define some initial variables to count the iterations
      trialI = 0
      stateI = 0
      trialsLen = args.number_trials
      statesLen = 600
      
      # load the first trial (also a generator)
      # without the for loop used before, the counters and next()
      # allow us to do the same as being done before in the for loop
      trial = next(trials)

      # With the use of generators and next() it is possible to keep
      # this first message light on the server and send the first response
      # as quickly as possible.
    
    # This type of message asks for the next instance of the simluation
    # without processing the entire model.
    elif data["tipo"] == "sim":
      # check if we are within the limits (before this was a nested for loop)
      if trialI < trialsLen and stateI < statesLen:
        # Trigger the next instance of the simulation
        state = next(trial)
        # update counter
        stateI = stateI + 1
        
        # Send the message with 1 instance of the simulation.
        # 
        stateString = json.dumps(state, cls=NumpyEncoder)
        await websocket.send_json(
          {
             "tipo": "estado",
             "datos": json.loads(stateString),
             "trialI": trialI,
             "stateI": stateI,
          }
        )
        
        # Check if the second loop is done
        if stateI == statesLen:
          # update counter of first loop
          trialI = trialI + 1
          # update counter of second loop
          stateI = 0
          
          # Check if there are more pending trials,
          # otherwise stop and notify the client we are done.
          try:
            trial = next(trials)
          except StopIteration:
            await websocket.send_json({"tipo": "fin"})

客户

只是实际改变的部分:

ws.onmessage = (e) => {
  const mensaje = JSON.parse(e.data);
  
  // Simply check the type of incoming message so it can be processed
  if (mensaje.tipo === 'fin') {
    viz.calcularResultados();
  } else if (mensaje.tipo === 'nodos') {
    viz.pintarNodos(mensaje.datos);
  } else if (mensaje.tipo === 'estado') {
    viz.sumarEstado(mensaje.datos);
  }

  // After receiving a message, ping the server for the next one 
  ws.send(
    JSON.stringify({
      tipo: 'sim',
    })
  );
};

这似乎是保持服务器和客户端协同工作的合理解决方案。我能够在客户端显示长时间模拟的进度,用户体验要比等待服务器响应的时间长得多。希望它能帮助其他人解决类似的问题

相关问题 更多 >