使用Grpc的最少连接负载均衡
node2.py 代码
least_connection.proto code
Node overloaded -- starting load balancing process
Traceback (most recent call last):
File "D:\lab7p2\least connection\node2.py", line 73, in <module>
node.check_load()
File "D:\lab7p2\least connection\node2.py", line 46, in check_load
self.balance_load()
File "D:\lab7p2\least connection\node2.py", line 36, in balance_load
request = least_connection_pb2.LoadTransferMessage(server=least_connections_address, load=int(transfer_load))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: Message must be initialized with a dict: least_connection.LoadTransferMessage
同样的,node1 和 node3 的代码
原型代码
import threading
import time
import grpc
import least_connection_pb2 as least_connection_pb2
import least_connection_pb2_grpc as least_connection_pb2_grpc
from concurrent import futures
class Node(least_connection_pb2_grpc.LeastConnectionLoadBalancerServicer):
def __init__(self):
self.node_id = '2'
self.server_address = 'localhost:50052'
self.connections = 85 # Initial number of connections
self.threshold = 80 # Threshold for load balancing
self.servers = [('localhost:50051', 70), ('localhost:50053', 65)] # List of tuples (server address, connections)
def get_least_connections_server(self):
least_connections_server = min(self.servers, key=lambda server: server[1], default=None)
return least_connections_server
def balance_load(self):
least_connections_server = self.get_least_connections_server()
if least_connections_server is None:
print("No server available for load transfer")
return
least_connections_address, least_connections_count = least_connections_server
transfer_load = self.connections - self.threshold
if transfer_load <= 0:
print("No load to transfer")
return
channel = grpc.insecure_channel(self.server_address)
stub = least_connection_pb2_grpc.LeastConnectionLoadBalancerStub(channel)
request = least_connection_pb2.LoadTransferMessage(server=least_connections_address, load=int(transfer_load))
response = stub.TransferLoad(request)
if response.success:
print(f"Node {self.node_id} --> {transfer_load} Units --> Server {least_connections_address}")
self.connections -= transfer_load
def check_load(self):
print(f"Current node load: {self.connections}")
if self.connections > self.threshold:
print("Node overloaded -- starting load balancing process")
self.balance_load()
def getServerWithLeastConnections(self, request, context):
least_connections_server = self.get_least_connections_server()
server_obj = least_connection_pb2.Server(name=least_connections_server[0], current_connections=least_connections_server[1])
return least_connection_pb2.LeastConnectionServer(server=server_obj, connections=least_connections_server[1])
def serve(node):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
least_connection_pb2_grpc.add_LeastConnectionLoadBalancerServicer_to_server(node, server)
server.add_insecure_port('localhost:50052')
server.start()
server.wait_for_termination()
if __name__ == "__main__":
node = Node()
server_thread = threading.Thread(target=serve, args=(node,))
server_thread.start()
# Wait for a moment to ensure the server is up and running
time.sleep(1)
try:
while True:
user_input = input("Enter 1 to check status: ")
if user_input == "1":
node.check_load()
print("--------------------------------------------")
except KeyboardInterrupt:
pass
syntax = "proto3";
package least_connection;
message Server {
string name = 1;
int32 current_connections = 2;
}
message RequestData {
string request_id = 1;
}
message LeastConnectionServer {
Server server = 1;
int32 connections = 2;
}
message LoadTransferMessage {
Server server = 1;
int32 load = 2;
}
service LeastConnectionLoadBalancer {
rpc getServerWithLeastConnections(RequestData) returns (LeastConnectionServer);
rpc TransferLoad(LoadTransferMessage) returns (LoadTransferResponse);
}
message LoadTransferResponse {
bool success = 1;
}
1 个回答
0
你的 Node
类里有一个实例属性 server
,它的定义是:
self.servers = [('localhost:50051', 70), ('localhost:50053', 65)]
这个属性的类型是一个包含元组的列表,元组里有一个字符串和一个整数。
所以在 balance_load
函数中,在这行代码之后:
least_connections_server = self.get_least_connections_server()
least_connections_server
里包含了一个字符串和一个整数的元组。
接下来你会把这个元组拆开:
least_connections_address, least_connections_count = least_connections_server
得到 least_connections_address
(字符串类型)和 least_connections_count
(整数类型)。
根据你的原型,LoadTransferMessage
需要一个类型为 Server
的 server
和一个整数类型的 load
。
request = least_connection_pb2.LoadTransferMessage(
server=least_connections_address,
load=int(transfer_load),
)
但是 least_connections_address
是一个 str
,而不是一个 Server
。
你想要的是(类似于):
server = least_connection_pb2.Server(
name=least_connections_address,
current_connections=least_connections_count,
)
request = least_connection_pb2.LoadTransferMessage(
server=server,
load=int(transfer_load),
)