Python ZeroMQ PUSH/PULL -- 消息丢失?
我正在尝试用 python
和 zeroMQ
在 PUSH / PULL
模式下发送消息,每隔几秒发送一次,消息大小是4[MB]。
但不知为何,虽然看起来所有消息都已发送,但只有部分消息被服务器接收到。我漏掉了什么吗?
这是客户端的代码 -- client.py
import zmq
import struct
# define a string of size 4[MB]
msgToSend = struct.pack('i', 45) * 1000 * 1000
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://127.0.0.1:5000")
# print the message size in bytes
print len(msgToSend)
socket.send(msgToSend)
print "Sent message"
这是服务器的代码 -- server.py
import zmq
import struct
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://127.0.0.1:5000")
while True:
# receive the message
msg = socket.recv()
print "Message Size is: {0} [MB]".format( len(msg) / (1000 * 1000) )
我漏掉了什么?怎么才能确保消息总是发送出去而不会丢失呢?
顺便提一下,我使用的是 Ubuntu 10.04
32位的机器,配备2[GB]内存。
注意:我用 RabbitMQ
做了同样的例子,结果一切正常,没有消息丢失。我很困惑,因为我常听人夸 zeroMQ
。为什么它在这里失败,而 RabbitMQ
却成功了呢?
2 个回答
1
有可能你遇到内存不足的问题,这取决于你发送消息的方式,以及消息是否被及时处理等等。你可以使用 socket.setsockopt(zmq.HWM)
来设置 HWM
的值,这样可以防止 zeromq 在发送缓冲区中存储过多的消息,保持一个合理的数量。考虑一下稍微修改过的例子:
# server
...
counter = 0
while True:
...receive the message
counter += 1
print "Total messages recieved: {0}".format(counter)
# client
socket.setsockopt(zmq.HWM, 8)
for i in range(1000):
socket.send(msgToSend)
然后运行10个测试客户端:
for i in {1..10}; do
python client.py &
done
从服务器上你可以看到所有的消息都被接收到了:
Total messages recieved: 9998
Total messages recieved: 9999
Total messages recieved: 10000
16
问题在于,当程序退出时,套接字会立刻关闭,并且会被垃圾回收,这样就相当于设置了一个有效的 LINGER 值为 0(也就是说,它会丢弃任何未发送的消息)。对于较大的消息来说,这就成了一个问题,因为发送这些消息所需的时间比套接字被垃圾回收的时间要长。
你可以通过在程序退出前加一个 sleep(0.1)
来解决这个问题(这样可以延迟套接字和上下文被垃圾回收)。
socket.setsockopt(zmq.LINGER, -1)
(这是默认设置)应该能避免这个问题,但出于某种原因,它并没有解决,我也没有时间去调查这个原因。