使用Grpc的最少连接负载均衡

0 投票
1 回答
28 浏览
提问于 2025-04-12 19:42

node2.py 代码

least_connection.proto code
Node overloaded -- starting load balancing process
Traceback (most recent call last):
File "D:\lab7p2\least connection\node2.py", line 73, in <module>
node.check_load()
File "D:\lab7p2\least connection\node2.py", line 46, in check_load
self.balance_load()
File "D:\lab7p2\least connection\node2.py", line 36, in balance_load
request = least_connection_pb2.LoadTransferMessage(server=least_connections_address, load=int(transfer_load))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: Message must be initialized with a dict: least_connection.LoadTransferMessage

同样的,node1 和 node3 的代码

原型代码

import threading
import time
import grpc
import least_connection_pb2 as least_connection_pb2
import least_connection_pb2_grpc as least_connection_pb2_grpc
from concurrent import futures

class Node(least_connection_pb2_grpc.LeastConnectionLoadBalancerServicer):
    def __init__(self):
        self.node_id = '2'
        self.server_address = 'localhost:50052'
        self.connections = 85  # Initial number of connections
        self.threshold = 80    # Threshold for load balancing
        self.servers = [('localhost:50051', 70), ('localhost:50053', 65)]  # List of tuples (server address, connections)

    def get_least_connections_server(self):
        least_connections_server = min(self.servers, key=lambda server: server[1], default=None)
        return least_connections_server

    def balance_load(self):
        least_connections_server = self.get_least_connections_server()
        if least_connections_server is None:
            print("No server available for load transfer")
            return

        least_connections_address, least_connections_count = least_connections_server

        transfer_load = self.connections - self.threshold
        if transfer_load <= 0:
            print("No load to transfer")
            return

        channel = grpc.insecure_channel(self.server_address)
        stub = least_connection_pb2_grpc.LeastConnectionLoadBalancerStub(channel)

        request = least_connection_pb2.LoadTransferMessage(server=least_connections_address, load=int(transfer_load))
        response = stub.TransferLoad(request)
        if response.success:
            print(f"Node {self.node_id} --> {transfer_load} Units --> Server {least_connections_address}")
            self.connections -= transfer_load

    def check_load(self):
        print(f"Current node load: {self.connections}")
        if self.connections > self.threshold:
            print("Node overloaded -- starting load balancing process")
            self.balance_load()

    def getServerWithLeastConnections(self, request, context):
        least_connections_server = self.get_least_connections_server()
        server_obj = least_connection_pb2.Server(name=least_connections_server[0], current_connections=least_connections_server[1])
        return least_connection_pb2.LeastConnectionServer(server=server_obj, connections=least_connections_server[1])

def serve(node):
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    least_connection_pb2_grpc.add_LeastConnectionLoadBalancerServicer_to_server(node, server)
    server.add_insecure_port('localhost:50052')
    server.start()
    server.wait_for_termination()

if __name__ == "__main__":
    node = Node()
    server_thread = threading.Thread(target=serve, args=(node,))
    server_thread.start()

    # Wait for a moment to ensure the server is up and running
    time.sleep(1)

    try:
        while True:
            user_input = input("Enter 1 to check status: ")

            if user_input == "1":
                node.check_load()
            print("--------------------------------------------")
    except KeyboardInterrupt:
        pass

syntax = "proto3";

package least_connection;

message Server {
  string name = 1;
  int32 current_connections = 2;
}

message RequestData {
  string request_id = 1;
}

message LeastConnectionServer {
  Server server = 1;
  int32 connections = 2;
}

message LoadTransferMessage {
  Server server = 1;
  int32 load = 2;
}

service LeastConnectionLoadBalancer {
  rpc getServerWithLeastConnections(RequestData) returns (LeastConnectionServer);
  rpc TransferLoad(LoadTransferMessage) returns (LoadTransferResponse);
}

message LoadTransferResponse {
  bool success = 1;
}

1 个回答

0

你的 Node 类里有一个实例属性 server,它的定义是:

self.servers = [('localhost:50051', 70), ('localhost:50053', 65)]

这个属性的类型是一个包含元组的列表,元组里有一个字符串和一个整数。

所以在 balance_load 函数中,在这行代码之后:

least_connections_server = self.get_least_connections_server()

least_connections_server 里包含了一个字符串和一个整数的元组。

接下来你会把这个元组拆开:

least_connections_address, least_connections_count = least_connections_server

得到 least_connections_address(字符串类型)和 least_connections_count(整数类型)。

根据你的原型,LoadTransferMessage 需要一个类型为 Serverserver 和一个整数类型的 load

request = least_connection_pb2.LoadTransferMessage(
  server=least_connections_address,
  load=int(transfer_load),
)

但是 least_connections_address 是一个 str,而不是一个 Server

你想要的是(类似于):

server = least_connection_pb2.Server(
  name=least_connections_address,
  current_connections=least_connections_count,
)

request = least_connection_pb2.LoadTransferMessage(
  server=server,
  load=int(transfer_load),
)

撰写回答