Python多线程ZeroMQ请求-回复

3 投票
1 回答
6983 浏览
提问于 2025-04-18 17:40

我想用Python和ZeroMQ实现一个请求-回复的模式,并且使用多线程。

在Python中,当一个新客户连接到服务器时,我可以创建一个新的线程。这个线程会处理与这个特定客户的所有通信,直到连接关闭:

# Thread that will handle client's requests
class ClientThread(threading.Thread):
    # Implementation...
    def __init__(self, socket):
        threading.Thread.__init__(self)
        self.socket = socket
    def run(self):
        while keep_alive:
            # Thread can receive from client
            data = self.socket.recv(1024)
            # Processing...
            # And send back a reply
            self.socket.send(reply)

while True:
    # The server accepts an incoming connection
    conn, addr = sock.accept()
    # And creates a new thread to handle the client's requests
    newthread = ClientThread(conn)
    # Starting the thread
    newthread.start()

那么,使用ZeroMQ也能做到这一点吗[*]?我见过一些关于ZeroMQ和Python的多线程示例,但它们都是在一开始就创建一个固定数量的线程池,似乎更倾向于负载均衡。

[*] 请注意,我想要的是保持客户和它的线程之间的连接活着,因为这个线程会期待来自客户的多个请求消息,并且会存储在消息之间需要保留的信息(比如:一个变量计数器,在收到新的请求消息时会增加它的值;所以每个线程都有自己的变量,其他客户永远不能访问这个线程)。新客户 = 新线程。

1 个回答

5

没错,ZeroMQ 是一个强大的工具箱

不过,最大的惊喜在于,ZeroMQ 的 socket 结构比你在示例中使用的普通 socket 要复杂得多。

{ aZmqContext -> aZmqSocket -> aBehavioralPrimitive }

ZeroMQ 在一个被称为 "singleton" 的 ZMQ-Context 的框架下,构建了一个非常出色的抽象框架,这个框架是唯一可以被“共享”的东西。

线程之间不应该“共享”其他“衍生”对象,更不要说它们的状态,因为这里有一个强大的分布式责任框架架构,这样做是为了保持设计的整洁,同时也能实现高性能和低延迟。

对于所有的 ZMQ-Socket,你可以想象成一个更聪明的分层结构,这样你就可以把 I/O 操作的烦恼交给 ZMQ-Context 来处理——因此保持连接、时间管理和公平排队等问题对你来说就“看不见”了……同时还有一种正式的通信模式 行为(由你选择的 ZMQ-Socket 类型决定)。

最后

ZeroMQ 和类似的 nanomsg 库就像乐高一样的项目,它们赋予你作为架构师和设计师的能力,远比你一开始想象的要多。

在这里输入图片描述

这样你就可以专注于分布式系统的行为,而不是浪费时间和精力去解决那些麻烦的 socket 消息传递问题。

(绝对值得看看 Pieter Hintjens 的两本书,他是 ZeroMQ 的共同创始人。在书中你会发现很多“哦,原来如此”的时刻。)

……而且作为额外的好处——你可以在一个与传输无关的通用环境中完成这一切,无论是通过 inproc:// 传递消息,还是通过 ipc://,甚至可以在 tcp:// 层同时收听和发送。

编辑#12014-08-19 17:00 [UTC+0000]

请查看下面的评论,并进一步审查你的——无论是基础的还是高级的——设计选项,以应对 容易失败 的处理、负载均衡 的 REP 工作队列、可扩展 的分布式处理和 容错模式 的 REP 工作二进制启动处理。

没有一堆模拟的 SLOC(源代码行数),也没有单一的代码示例可以适用于所有情况。

在设计分布式消息系统时,这一点尤其重要。

"""REQ/REP modified with QUEUE/ROUTER/DEALER add-on ---------------------------

   Multithreaded Hello World server

   Author: Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>

"""
import time
import threading
import zmq

print "ZeroMQ version sanity-check: ", zmq.__version__

def aWorker_asRoutine( aWorker_URL, aContext = None ):
    """Worker routine"""
    #Context to get inherited or create a new one trick------------------------------
    aContext = aContext or zmq.Context.instance()
    
    # Socket to talk to dispatcher --------------------------------------------------
    socket = aContext.socket( zmq.REP )
    
    socket.connect( aWorker_URL )
    
    while True:

        string  = socket.recv()

        print( "Received request: [ %s ]" % ( string ) )
        
        # do some 'work' -----------------------------------------------------------
        time.sleep(1)

        #send reply back to client, who asked --------------------------------------
        socket.send( b"World" )

def main():
    """Server routine"""

    url_worker = "inproc://workers"
    url_client = "tcp://*:5555"

    # Prepare our context and sockets ------------------------------------------------
    aLocalhostCentralContext = zmq.Context.instance()

    # Socket to talk to clients ------------------------------------------------------
    clients = aLocalhostCentralContext.socket( zmq.ROUTER )
    clients.bind( url_client )

    # Socket to talk to workers ------------------------------------------------------
    workers = aLocalhostCentralContext.socket( zmq.DEALER )
    workers.bind( url_worker )

    # --------------------------------------------------------------------||||||||||||--
    # Launch pool of worker threads --------------< or spin-off by one in OnDemandMODE >
    for i in range(5):
        thread = threading.Thread( target = aWorker_asRoutine, args = ( url_worker, ) )
        thread.start()

    zmq.device( zmq.QUEUE, clients, workers )

    # ----------------------|||||||||||||||------------------------< a fair practice >--
    # We never get here but clean up anyhow
    clients.close()
    workers.close()
    aLocalhostCentralContext.term()

if __name__ == "__main__":
    main()

撰写回答