如何确保在故障或挂起时关闭负载均衡器中的所有连接?

2 投票
1 回答
562 浏览
提问于 2025-04-15 16:48

我正在尝试写一个简单的负载均衡器。它运行得还不错,直到有一个服务器(BalanceServer)没有关闭连接,这时候... 客户端(ReverseProxy)断开了连接,但与BalanceServer的连接却保持打开状态。我试着在ReverseProxy.connectionLost中添加一个回调(#3),以便在服务器断开连接时关闭与其中一台服务器的连接(clientLoseConnection),但在那个时候,ServerWriter是空的,我无法在#1和#2中终止它。


我该如何确保当一方断开连接时,所有连接都能关闭?我想如果在客户端和某台服务器都挂起的时候加个超时设置也不错,但我该如何添加这个超时,以便它能在两个连接上都有效呢?


from twisted.internet.protocol import Protocol, Factory, ClientCreator
from twisted.internet import reactor, defer
from collections import namedtuple

BalanceServer = namedtuple('BalanceServer', 'host port')

SERVER_LIST = [BalanceServer('127.0.0.1', 8000), BalanceServer('127.0.0.1', 8001)]

def getServer(servers):
    while True:
        for server in servers:
            yield server

# this writes to one of balance servers and responds to client with callback 'clientWrite'
class ServerWriter(Protocol):
    def sendData(self, data):
        self.transport.write(data)

    def dataReceived(self, data):
        self.clientWrite(data)

    def connectionLost( self, reason ):
        self.clientLoseConnection()

# callback for reading data from client to send it to server and get response to client again    
def transferData(serverWriter, clientWrite, clientLoseConnection, data):
    if serverWriter:
        serverWriter.clientWrite = clientWrite
        serverWriter.clientLoseConnection = clientLoseConnection
        serverWriter.sendData(data)

def closeConnection(serverWriter):
    if serverWriter: #1 this is null
        #2 So connection is not closed and hangs there, till BalanceServer close it 
        serverWriter.transport.loseConnection()

# accepts clients
class ReverseProxy(Protocol):
    def connectionMade(self):
        server = self.factory.getServer()
        self.serverWriter = ClientCreator(reactor, ServerWriter)
        self.client = self.serverWriter.connectTCP( server.host, server.port )

    def dataReceived(self, data):
        self.client.addCallback(transferData, self.transport.write, 
                    self.transport.loseConnection, data )

    def connectionLost(self, reason):
        self.client.addCallback(closeConnection) #3 adding close doesn't work


class ReverseProxyFactory(Factory):
    protocol = ReverseProxy
    def __init__(self, serverGenerator):
        self.getServer = serverGenerator

plainFactory = ReverseProxyFactory( getServer(SERVER_LIST).next )
reactor.listenTCP( 7777, plainFactory )
reactor.run()

1 个回答

1

你可以看看 twisted.internet.protocols.portforward,里面有个例子展示了如何连接两个连接并在需要时断开它们。或者,你也可以直接使用 txloadbalancer,这样就不用自己写代码了。

不过要注意,如果连接上没有任何数据传输,loseConnection 是不会强制断开连接的。所以如果你的应用没有发送任何数据或者没有进行“心跳”检测,连接可能就一直保持着,不会关闭。这是Twisted里一个长期存在的bug。 其实,这是最久的一个bug。也许你想来帮忙修复一下呢 :).

撰写回答