使用asyncore创建客户端/服务器模型的交互会话
我正在尝试创建一个程序,让很多客户端能够同时连接到一个服务器。这些连接在服务器端应该是互动的,也就是说,在客户端连接后,我可以从服务器向客户端发送请求。
下面这个asyncore的示例代码只是简单地回复一个回声,我需要的不是回声,而是能够互动地访问每个会话。也就是说,我希望能够在后台管理每个连接,直到我决定与它互动。如果我有100个会话,我想选择一个特定的会话,或者选择所有会话,或者选择其中的一部分,向它们发送命令。此外,我也不太确定asyncore库是否是解决这个问题的最佳选择,任何帮助都非常感谢。
import asyncore
import socket
class EchoHandler(asyncore.dispatcher_with_send):
def handle_read(self):
data = self.recv(8192)
if data:
self.send(data)
class EchoServer(asyncore.dispatcher):
def __init__(self, host, port):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
def handle_accept(self):
pair = self.accept()
if pair is not None:
sock, addr = pair
print 'Incoming connection from %s' % repr(addr)
handler = EchoHandler(sock)
server = EchoServer('localhost', 8080)
asyncore.loop()
2 个回答
审查需求
你想要:
- 以客户端/服务器的方式进行远程调用
- 可能使用TCP通信
- 在调用中使用会话
关于你想如何使用会话并不太清楚,所以我会认为会话只是调用参数之一,它在服务器和客户端都有一定的意义,因此我会跳过实现这部分。
zmq
:简单可靠的远程消息传递平台
ZeroMQ是一个轻量级的消息传递平台,不需要复杂的服务器基础设施。它可以处理多种消息模式,下面的例子展示了使用多部分消息的请求/回复模式。
还有很多其他选择,你可以使用简单的消息,编码成像JSON这样的格式,而不必使用多部分消息。
server.py
import zmq
class ZmqServer(object):
def __init__(self, url="tcp://*:5555"):
context = zmq.Context()
self.sock = context.socket(zmq.REP)
self.sock.bind(url)
self.go_on = False
def echo(self, message, priority=None):
priority = priority or "not urgent"
msg = "Echo your {priority} message: '{message}'"
return msg.format(**locals())
def run(self):
self.go_on = True
while self.go_on:
args = self.sock.recv_multipart()
if 1 <= len(args) <= 2:
code = "200"
resp = self.echo(*args)
else:
code = "401"
resp = "Bad request, 1-2 arguments expected."
self.sock.send_multipart([code, resp])
def stop(self):
self.go_on = False
if __name__ == "__main__":
ZmqServer().run()
client.py
import zmq
import time
class ZmqClient(object):
def __init__(self, url="tcp://localhost:5555"):
context = zmq.Context()
self.socket = context.socket(zmq.REQ)
self.socket.connect(url)
def call_echo(self, message, priority=None):
args = [message]
if priority:
args.append(priority)
self.socket.send_multipart(args)
code, resp = self.socket.recv_multipart()
assert code == "200"
return resp
def too_long_call(self, message, priority, extrapriority):
args = [message, priority, extrapriority]
self.socket.send_multipart(args)
code, resp = self.socket.recv_multipart()
assert code == "401"
return resp
def test_server(self):
print "------------------"
rqmsg = "Hi There"
print "rqmsg", rqmsg
print "response", self.call_echo(rqmsg)
print "------------------"
time.sleep(2)
rqmsg = ["Hi There", "very URGENT"]
print "rqmsg", rqmsg
print "response", self.call_echo(*rqmsg)
print "------------------"
time.sleep(2)
time.sleep(2)
rqmsg = []
print "too_short_call"
print "response", self.too_long_call("STOP", "VERY URGENT", "TOO URGENT")
print "------------------"
if __name__ == "__main__":
ZmqClient().test_server()
玩玩这个小玩具
启动服务器:
$ python server.py
现在服务器正在运行,等待请求。
现在启动客户端:
$ python client.py
------------------
rqmsg Hi There
response Echo your not urgent message: 'Hi There'
------------------
rqmsg ['Hi There', 'very URGENT']
response Echo your very URGENT message: 'Hi There'
------------------
too_short_call
response Bad request, 1-2 arguments expected.
------------------
现在可以稍微实验一下:
- 先启动客户端,然后启动服务器
- 在处理过程中停止服务器,稍后再重启
- 启动多个客户端
所有这些场景都可以通过zmq
处理,而无需增加额外的Python代码。
结论
ZeroMQ提供了非常方便的远程消息传递解决方案,试着计算一下与消息相关的代码行数,并与任何其他提供相同稳定性水平的解决方案进行比较。
会话(这是提问者提到的部分)可以被视为调用的额外参数。正如我们看到的,多个参数并不是问题。
维护会话时,可以使用不同的后端,它们可以存储在内存中(对于单个服务器实例)、数据库中,或者在memcache或Redis中。这个回答没有进一步详细说明会话,因为不太清楚预期的用途是什么。
这是一个Twisted服务器:
import sys
from twisted.internet.task import react
from twisted.internet.endpoints import serverFromString
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
class HubConnection(LineReceiver, object):
def __init__(self, hub):
self.name = b'unknown'
self.hub = hub
def connectionMade(self):
self.hub.append(self)
def lineReceived(self, line):
words = line.split(" ", 1)
if words[0] == b'identify':
self.name = words[1]
else:
for connection in self.hub:
connection.sendLine("<{}> {}".format(
self.name, line
).encode("utf-8"))
def connectionLost(self, reason):
self.hub.remove(self)
def main(reactor, listen="tcp:4321"):
hub = []
endpoint = serverFromString(reactor, listen)
endpoint.listen(Factory.forProtocol(lambda: HubConnection(hub)))
return Deferred()
react(main, sys.argv[1:])
还有一个命令行客户端:
import sys
from twisted.internet.task import react
from twisted.internet.endpoints import clientFromString
from twisted.internet.defer import Deferred, inlineCallbacks
from twisted.internet.protocol import Factory
from twisted.internet.stdio import StandardIO
from twisted.protocols.basic import LineReceiver
from twisted.internet.fdesc import setBlocking
class HubClient(LineReceiver):
def __init__(self, name, output):
self.name = name
self.output = output
def lineReceived(self, line):
self.output.transport.write(line + b"\n")
def connectionMade(self):
self.sendLine("identify {}".format(self.name).encode("utf-8"))
def say(self, words):
self.sendLine("say {}".format(words).encode("utf-8"))
class TerminalInput(LineReceiver, object):
delimiter = "\n"
hubClient = None
def lineReceived(self, line):
if self.hubClient is None:
self.output.transport.write("Connecting, please wait...\n")
else:
self.hubClient.sendLine(line)
@inlineCallbacks
def main(reactor, name, connect="tcp:localhost:4321"):
endpoint = clientFromString(reactor, connect)
terminalInput = TerminalInput()
StandardIO(terminalInput)
setBlocking(0)
hubClient = yield endpoint.connect(
Factory.forProtocol(lambda: HubClient(name, terminalInput))
)
terminalInput.transport.write("Connecting...\n")
terminalInput.hubClient = hubClient
terminalInput.transport.write("Connected.\n")
yield Deferred()
react(main, sys.argv[1:])
它们实现了一个基本的聊天服务器。希望这些代码比较容易理解;你可以在一个终端运行 python hub_server.py
来启动服务器,在第二个终端运行 python hub_client.py alice
来启动一个名为Alice的客户端,在第三个终端运行 python hub_client.py bob
来启动一个名为Bob的客户端;然后在Alice和Bob的会话中输入内容,你就能看到它是怎么工作的。