Python多线程ZeroMQ请求-回复
我想用Python和ZeroMQ实现一个请求-回复的模式,并且使用多线程。
在Python中,当一个新客户连接到服务器时,我可以创建一个新的线程。这个线程会处理与这个特定客户的所有通信,直到连接关闭:
# Thread that will handle client's requests
class ClientThread(threading.Thread):
# Implementation...
def __init__(self, socket):
threading.Thread.__init__(self)
self.socket = socket
def run(self):
while keep_alive:
# Thread can receive from client
data = self.socket.recv(1024)
# Processing...
# And send back a reply
self.socket.send(reply)
while True:
# The server accepts an incoming connection
conn, addr = sock.accept()
# And creates a new thread to handle the client's requests
newthread = ClientThread(conn)
# Starting the thread
newthread.start()
那么,使用ZeroMQ也能做到这一点吗[*]?我见过一些关于ZeroMQ和Python的多线程示例,但它们都是在一开始就创建一个固定数量的线程池,似乎更倾向于负载均衡。
[*] 请注意,我想要的是保持客户和它的线程之间的连接活着,因为这个线程会期待来自客户的多个请求消息,并且会存储在消息之间需要保留的信息(比如:一个变量计数器,在收到新的请求消息时会增加它的值;所以每个线程都有自己的变量,其他客户永远不能访问这个线程)。新客户 = 新线程。
1 个回答
没错,ZeroMQ 是一个强大的工具箱
不过,最大的惊喜在于,ZeroMQ 的 socket 结构比你在示例中使用的普通 socket 要复杂得多。
{ aZmqContext -> aZmqSocket -> aBehavioralPrimitive }
ZeroMQ 在一个被称为 "singleton" 的 ZMQ-Context
的框架下,构建了一个非常出色的抽象框架,这个框架是唯一可以被“共享”的东西。
线程之间不应该“共享”其他“衍生”对象,更不要说它们的状态,因为这里有一个强大的分布式责任框架架构,这样做是为了保持设计的整洁,同时也能实现高性能和低延迟。
对于所有的 ZMQ-Socket
,你可以想象成一个更聪明的分层结构,这样你就可以把 I/O 操作的烦恼交给 ZMQ-Context
来处理——因此保持连接、时间管理和公平排队等问题对你来说就“看不见”了……同时还有一种正式的通信模式 行为(由你选择的 ZMQ-Socket
类型决定)。
最后
ZeroMQ 和类似的 nanomsg 库就像乐高一样的项目,它们赋予你作为架构师和设计师的能力,远比你一开始想象的要多。
这样你就可以专注于分布式系统的行为,而不是浪费时间和精力去解决那些麻烦的 socket 消息传递问题。
(绝对值得看看 Pieter Hintjens 的两本书,他是 ZeroMQ 的共同创始人。在书中你会发现很多“哦,原来如此”的时刻。)
……而且作为额外的好处——你可以在一个与传输无关的通用环境中完成这一切,无论是通过 inproc://
传递消息,还是通过 ipc://
,甚至可以在 tcp://
层同时收听和发送。
编辑#1
2014-08-19 17:00 [UTC+0000]
请查看下面的评论,并进一步审查你的——无论是基础的还是高级的——设计选项,以应对 容易失败 的处理、负载均衡 的 REP 工作队列、可扩展 的分布式处理和 容错模式 的 REP 工作二进制启动处理。
没有一堆模拟的 SLOC(源代码行数),也没有单一的代码示例可以适用于所有情况。
在设计分布式消息系统时,这一点尤其重要。
"""REQ/REP modified with QUEUE/ROUTER/DEALER add-on ---------------------------
Multithreaded Hello World server
Author: Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>
"""
import time
import threading
import zmq
print "ZeroMQ version sanity-check: ", zmq.__version__
def aWorker_asRoutine( aWorker_URL, aContext = None ):
"""Worker routine"""
#Context to get inherited or create a new one trick------------------------------
aContext = aContext or zmq.Context.instance()
# Socket to talk to dispatcher --------------------------------------------------
socket = aContext.socket( zmq.REP )
socket.connect( aWorker_URL )
while True:
string = socket.recv()
print( "Received request: [ %s ]" % ( string ) )
# do some 'work' -----------------------------------------------------------
time.sleep(1)
#send reply back to client, who asked --------------------------------------
socket.send( b"World" )
def main():
"""Server routine"""
url_worker = "inproc://workers"
url_client = "tcp://*:5555"
# Prepare our context and sockets ------------------------------------------------
aLocalhostCentralContext = zmq.Context.instance()
# Socket to talk to clients ------------------------------------------------------
clients = aLocalhostCentralContext.socket( zmq.ROUTER )
clients.bind( url_client )
# Socket to talk to workers ------------------------------------------------------
workers = aLocalhostCentralContext.socket( zmq.DEALER )
workers.bind( url_worker )
# --------------------------------------------------------------------||||||||||||--
# Launch pool of worker threads --------------< or spin-off by one in OnDemandMODE >
for i in range(5):
thread = threading.Thread( target = aWorker_asRoutine, args = ( url_worker, ) )
thread.start()
zmq.device( zmq.QUEUE, clients, workers )
# ----------------------|||||||||||||||------------------------< a fair practice >--
# We never get here but clean up anyhow
clients.close()
workers.close()
aLocalhostCentralContext.term()
if __name__ == "__main__":
main()