Python中使用ZMQ的客户端/服务器配置中的嗅探器/监视器

6 投票
1 回答
4108 浏览
提问于 2025-04-18 07:40

我用ZeroMQ实现了一个客户端/服务器的通信,现在想加一个监控工具,来捕捉这两者之间的交流。

              client <---------> server
              (REQ)       |         (REP)
                          | 
                          |
                          v
                        sniffer  <-this is what I want to add

假设客户端和服务器是通过5555这个端口进行通信的,我该如何添加一个监控工具来监听同一个端口呢?有没有办法区分哪些消息是来自客户端,哪些是来自服务器的?有没有人能分享一下经验?

根据Jan的回答进行了编辑

配置会变成这样:

client [REQ]----->[ROUTER:4444] monitor [DEALER]------->[REP:5555] server
                              [PUB:7777]
                                  ^
                                  |
                                  |
                                  |
                                  | 
                                [SUB] 
                            monitorclient(sniffer)  <-this is what I want to add

箭头表示连接的方向(指向绑定的端口)。

消息的流动是这样的:

  • 客户端 -> 监控工具 -> 服务器 -> 监控工具 -> 客户端
  • 还有 监控工具 -> 监控客户端

这里有一张更清晰的图 在这里

1 个回答

15

为了进行数据监控,我们需要一个中间部分。

zmq提供了几种选择:

  • 自己写一个程序,一边接收请求,一边发送出去,获取响应后再发送给原请求者,同时把这些流量记录下来。
  • 使用zmq.proxy - 不过,这需要最新版本的libzmq(即zmq.zmq_version_info() >= 3),而我现在的Ubuntu 14.04上还没有这个版本,所以我就不考虑这个了。
  • 使用MonitoredQueue - 这可能是你想要的。这提供了一个循环,可以在前端和后端之间交换消息,同时把它们发布到另一个socket。

计划

这个解决方案基于pyzmq文档中的MonitoredQueue示例

服务器绑定到5555端口

服务器将绑定到5555端口。与其他示例不同,我会把你的服务器作为固定部分,不会改变它与MonitoredQueue的连接。不过,这样的交换并不会造成问题,只要你正确调整MonitoredQueue即可。

MonitoredQueue绑定到4444端口,连接到5555端口,并在7777端口发布流量

MonitoredQueue位于客户端和服务器之间。它监听4444端口,向服务器发送请求,并将响应返回给客户端。同时,任何经过的消息都会在PUB socket上以“in”或“out”作为前缀进行发布。稍后我们会看到,这些消息不仅包含前缀和请求/响应,还包含客户端的身份信息。

客户端连接到4444端口

客户端可以直接连接到5555端口的服务器,但这样就无法监控流量。因此,我们将客户端连接到4444端口,MonitoredQueue在这里等待服务器并进行监控。

你会发现,客户端和服务器不需要修改任何代码就能参与这个交换。

实际代码

server.py

在我们的例子中,服务器期望一个可以转换为整数的字符串,并返回一个值加倍后的字符串。

import zmq

def double_server(server_url="tcp://*:5555"):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind(server_url)
    print "server started..."
    while True:
        req = socket.recv()
        print "server received request", req
        result = str(2*int(req))
        socket.send(result)
        print "server replied with", result

if __name__ == "__main__":
    double_server()

client.py

我们的客户端将尝试5次在本地的4444端口请求一些结果。

import zmq

def client(server_url="tcp://localhost:4444"):
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    # socket.setsockopt(zmq.IDENTITY, "client_id_abc") # see Conclusions
    socket.connect(server_url)

    for i in range(5):
        print "request", i
        socket.send(str(i))
        res = socket.recv()
        print i, "result: ", res

if __name__ == "__main__":
    client()

你可以现在尝试直接连接到5555端口,看看是否能工作,但为了监控,我们必须让它与MonitoredQueue通信。

monitor.py

这里是所有的魔法所在。pyzmq已经提供了设备MonitoredQueue,所以我们可以直接使用它。

import zmq
from zmq.devices.monitoredqueuedevice import MonitoredQueue
from zmq.utils.strtypes import asbytes

def monitoredqueue(frontend_url="tcp://*:4444", server_url="tcp://localhost:5555", capture_url="tcp://*:7777"):
    mondev = MonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB, asbytes("in"), asbytes("out"))
    mondev.bind_in(frontend_url)
    mondev.connect_out(server_url)
    mondev.bind_mon(capture_url)
    mondev.setsockopt_in(zmq.HWM, 1)
    mondev.start()
    print "monitored queue started"

if __name__ == "__main__":
    monitoredqueue()

关于socket类型和别名的说明:

  • zmq.ROUTER以前叫做zmq.XREP
  • zmq.DEALER以前叫做zmq.XREQ
  • 这些别名仍然有效。

MonitoredQueue会在7777端口的zmq.PUB socket上发布每条经过的消息。这些消息会以“in”和“out”作为前缀,并且还会包含一个身份字符串。这条身份字符串是由ROUTER socket分配的,在交换过程中对所有连接的客户端都是唯一的。这个身份是所谓的信封的一部分,并且是由空帧分隔的请求/响应消息(稍后会看到)。

monitorclient.py

这个监控客户端只是为了展示如何获取监控到的信息。

它订阅了由监控器(MonitoredQueue)服务的7777端口,并将其打印出来。重要的是要处理多部分消息,否则我们会错过一些信息。

import zmq

def monitorclient(server_url="tcp://localhost:7777"):
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect(server_url)
    socket.setsockopt(zmq.SUBSCRIBE, "")
    print "started monitoring client"

    while True:
        res = socket.recv_multipart()
        print res

if __name__ == "__main__":
    monitorclient()

运行它

我们需要打开4个控制台,每个控制台启动一个python脚本。

首先启动服务器:

$ python server.py

启动MonitoredQueue

$ python monitor.py

启动客户端,读取监控到的消息

$ python monitorclient.py

最后,启动客户端,尝试从通过MonitoredQueue代理的服务器获取一些响应

$ python client.py
request 0
0 result:  0
request 1
1 result:  2
request 2
2 result:  4
request 3
3 result:  6
request 4
4 result:  8

结果如预期。

现在检查server.py的输出:

$ python server.py
server received request 0
server replied with 0
server received request 1
server replied with 2
server received request 2
server replied with 4
server received request 3
server replied with 6
server received request 4
server replied with 8

没什么意外,一切正常。

我们的monitor.py没有打印任何内容,我们需要查看monitorclient.py的输出

$ python monitorclient.py 
started monitoring client
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '0']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '0']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '1']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '2']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '2']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '4']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '3']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '6']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '4']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '8']

在这里你可以看到所有10条消息的打印,5条请求,5条响应。

每条消息的结构是[prefix, identity, emptyframe, message],其中:

  • prefix是“in”或“out”
  • identity是由MonitoredQueue分配给特定客户端的字符串。每次客户端连接时,这个身份可能会改变。作为额外的好处,我们可以连接多个客户端,仍然能够区分不同的客户端。如果你需要特定的客户端身份,可以查看client.py中注释掉的行socket.setsockopt(zmq.IDENTITY, "client_id_abc")。如果你取消注释,你会看到"client_id_abc"作为你的客户端身份。
  • emptyframe被视为'',用于分隔信封和消息数据。
  • message是客户端请求的内容或服务器的响应。

结论

  • 监控工作正常,PyZMQ已经提供了MonitoredQueue设备来实现这个目的。
  • 使用zmq.PUB进行监控不会阻塞任何通信,你可以简单地忽略监控到的数据,所有功能仍然正常。
  • 在生产环境中,将MonitoredQueue作为系统的固定部分是实用的,这样可以绑定到已知的IP地址和端口。这需要对服务器进行一些更改,服务器需要连接(而不是当前的绑定)。这样的更改是微不足道的,不会影响其余代码和行为。如果你只有一个端点需要监控,你也可以将监控嵌入到服务器中(这需要两个线程,一个用于服务器,另一个用于监控)。
  • zmq是处理这类任务的优秀“乐高”。

撰写回答