Python中使用ZMQ的客户端/服务器配置中的嗅探器/监视器
我用ZeroMQ实现了一个客户端/服务器的通信,现在想加一个监控工具,来捕捉这两者之间的交流。
client <---------> server
(REQ) | (REP)
|
|
v
sniffer <-this is what I want to add
假设客户端和服务器是通过5555这个端口进行通信的,我该如何添加一个监控工具来监听同一个端口呢?有没有办法区分哪些消息是来自客户端,哪些是来自服务器的?有没有人能分享一下经验?
根据Jan的回答进行了编辑
配置会变成这样:
client [REQ]----->[ROUTER:4444] monitor [DEALER]------->[REP:5555] server
[PUB:7777]
^
|
|
|
|
[SUB]
monitorclient(sniffer) <-this is what I want to add
箭头表示连接的方向(指向绑定的端口)。
消息的流动是这样的:
客户端
->监控工具
->服务器
->监控工具
->客户端
- 还有
监控工具
->监控客户端
这里有一张更清晰的图 在这里。
1 个回答
为了进行数据监控,我们需要一个中间部分。
zmq提供了几种选择:
- 自己写一个程序,一边接收请求,一边发送出去,获取响应后再发送给原请求者,同时把这些流量记录下来。
- 使用zmq.proxy - 不过,这需要最新版本的
libzmq
(即zmq.zmq_version_info() >= 3),而我现在的Ubuntu 14.04上还没有这个版本,所以我就不考虑这个了。 - 使用MonitoredQueue - 这可能是你想要的。这提供了一个循环,可以在前端和后端之间交换消息,同时把它们发布到另一个socket。
计划
这个解决方案基于pyzmq文档中的MonitoredQueue示例
服务器绑定到5555端口
服务器将绑定到5555端口。与其他示例不同,我会把你的服务器作为固定部分,不会改变它与MonitoredQueue的连接。不过,这样的交换并不会造成问题,只要你正确调整MonitoredQueue即可。
MonitoredQueue绑定到4444端口,连接到5555端口,并在7777端口发布流量
MonitoredQueue位于客户端和服务器之间。它监听4444端口,向服务器发送请求,并将响应返回给客户端。同时,任何经过的消息都会在PUB socket上以“in”或“out”作为前缀进行发布。稍后我们会看到,这些消息不仅包含前缀和请求/响应,还包含客户端的身份信息。
客户端连接到4444端口
客户端可以直接连接到5555端口的服务器,但这样就无法监控流量。因此,我们将客户端连接到4444端口,MonitoredQueue在这里等待服务器并进行监控。
你会发现,客户端和服务器不需要修改任何代码就能参与这个交换。
实际代码
server.py
在我们的例子中,服务器期望一个可以转换为整数的字符串,并返回一个值加倍后的字符串。
import zmq
def double_server(server_url="tcp://*:5555"):
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind(server_url)
print "server started..."
while True:
req = socket.recv()
print "server received request", req
result = str(2*int(req))
socket.send(result)
print "server replied with", result
if __name__ == "__main__":
double_server()
client.py
我们的客户端将尝试5次在本地的4444端口请求一些结果。
import zmq
def client(server_url="tcp://localhost:4444"):
context = zmq.Context()
socket = context.socket(zmq.REQ)
# socket.setsockopt(zmq.IDENTITY, "client_id_abc") # see Conclusions
socket.connect(server_url)
for i in range(5):
print "request", i
socket.send(str(i))
res = socket.recv()
print i, "result: ", res
if __name__ == "__main__":
client()
你可以现在尝试直接连接到5555端口,看看是否能工作,但为了监控,我们必须让它与MonitoredQueue通信。
monitor.py
这里是所有的魔法所在。pyzmq
已经提供了设备MonitoredQueue
,所以我们可以直接使用它。
import zmq
from zmq.devices.monitoredqueuedevice import MonitoredQueue
from zmq.utils.strtypes import asbytes
def monitoredqueue(frontend_url="tcp://*:4444", server_url="tcp://localhost:5555", capture_url="tcp://*:7777"):
mondev = MonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB, asbytes("in"), asbytes("out"))
mondev.bind_in(frontend_url)
mondev.connect_out(server_url)
mondev.bind_mon(capture_url)
mondev.setsockopt_in(zmq.HWM, 1)
mondev.start()
print "monitored queue started"
if __name__ == "__main__":
monitoredqueue()
关于socket类型和别名的说明:
- zmq.ROUTER以前叫做zmq.XREP
- zmq.DEALER以前叫做zmq.XREQ
- 这些别名仍然有效。
MonitoredQueue会在7777端口的zmq.PUB socket上发布每条经过的消息。这些消息会以“in”和“out”作为前缀,并且还会包含一个身份字符串。这条身份字符串是由ROUTER socket分配的,在交换过程中对所有连接的客户端都是唯一的。这个身份是所谓的信封的一部分,并且是由空帧分隔的请求/响应消息(稍后会看到)。
monitorclient.py
这个监控客户端只是为了展示如何获取监控到的信息。
它订阅了由监控器(MonitoredQueue)服务的7777端口,并将其打印出来。重要的是要处理多部分消息,否则我们会错过一些信息。
import zmq
def monitorclient(server_url="tcp://localhost:7777"):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect(server_url)
socket.setsockopt(zmq.SUBSCRIBE, "")
print "started monitoring client"
while True:
res = socket.recv_multipart()
print res
if __name__ == "__main__":
monitorclient()
运行它
我们需要打开4个控制台,每个控制台启动一个python脚本。
首先启动服务器:
$ python server.py
启动MonitoredQueue
$ python monitor.py
启动客户端,读取监控到的消息
$ python monitorclient.py
最后,启动客户端,尝试从通过MonitoredQueue代理的服务器获取一些响应
$ python client.py
request 0
0 result: 0
request 1
1 result: 2
request 2
2 result: 4
request 3
3 result: 6
request 4
4 result: 8
结果如预期。
现在检查server.py的输出:
$ python server.py
server received request 0
server replied with 0
server received request 1
server replied with 2
server received request 2
server replied with 4
server received request 3
server replied with 6
server received request 4
server replied with 8
没什么意外,一切正常。
我们的monitor.py
没有打印任何内容,我们需要查看monitorclient.py
的输出
$ python monitorclient.py
started monitoring client
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '0']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '0']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '1']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '2']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '2']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '4']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '3']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '6']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '4']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '8']
在这里你可以看到所有10条消息的打印,5条请求,5条响应。
每条消息的结构是[prefix, identity, emptyframe, message]
,其中:
prefix
是“in”或“out”identity
是由MonitoredQueue分配给特定客户端的字符串。每次客户端连接时,这个身份可能会改变。作为额外的好处,我们可以连接多个客户端,仍然能够区分不同的客户端。如果你需要特定的客户端身份,可以查看client.py
中注释掉的行socket.setsockopt(zmq.IDENTITY, "client_id_abc")
。如果你取消注释,你会看到"client_id_abc"
作为你的客户端身份。emptyframe
被视为''
,用于分隔信封和消息数据。message
是客户端请求的内容或服务器的响应。
结论
- 监控工作正常,PyZMQ已经提供了MonitoredQueue设备来实现这个目的。
- 使用
zmq.PUB
进行监控不会阻塞任何通信,你可以简单地忽略监控到的数据,所有功能仍然正常。 - 在生产环境中,将MonitoredQueue作为系统的固定部分是实用的,这样可以绑定到已知的IP地址和端口。这需要对服务器进行一些更改,服务器需要连接(而不是当前的绑定)。这样的更改是微不足道的,不会影响其余代码和行为。如果你只有一个端点需要监控,你也可以将监控嵌入到服务器中(这需要两个线程,一个用于服务器,另一个用于监控)。
zmq
是处理这类任务的优秀“乐高”。