Python ZeroMQ PUSH/PULL -- 消息丢失?

8 投票
2 回答
8681 浏览
提问于 2025-04-16 21:34

我正在尝试用 pythonzeroMQPUSH / PULL 模式下发送消息,每隔几秒发送一次,消息大小是4[MB]

但不知为何,虽然看起来所有消息都已发送,但只有部分消息被服务器接收到。我漏掉了什么吗?

这是客户端的代码 -- client.py

import zmq
import struct

# define a string of size 4[MB] 
msgToSend = struct.pack('i', 45) * 1000 * 1000 

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://127.0.0.1:5000")

# print the message size in bytes
print len(msgToSend)

socket.send(msgToSend)

print "Sent message"

这是服务器的代码 -- server.py

import zmq
import struct

context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://127.0.0.1:5000")

while True:
    # receive the message
    msg = socket.recv()

    print "Message Size is: {0} [MB]".format( len(msg) / (1000 * 1000) )

我漏掉了什么?怎么才能确保消息总是发送出去而不会丢失呢?

顺便提一下,我使用的是 Ubuntu 10.04 32位的机器,配备2[GB]内存。

注意:我用 RabbitMQ 做了同样的例子,结果一切正常,没有消息丢失。我很困惑,因为我常听人夸 zeroMQ。为什么它在这里失败,而 RabbitMQ 却成功了呢?

2 个回答

1

有可能你遇到内存不足的问题,这取决于你发送消息的方式,以及消息是否被及时处理等等。你可以使用 socket.setsockopt(zmq.HWM) 来设置 HWM 的值,这样可以防止 zeromq 在发送缓冲区中存储过多的消息,保持一个合理的数量。考虑一下稍微修改过的例子:

# server
...
counter = 0
while True:
    ...receive the message
    counter += 1
    print "Total messages recieved: {0}".format(counter)

# client
socket.setsockopt(zmq.HWM, 8)
for i in range(1000):
    socket.send(msgToSend)

然后运行10个测试客户端:

for i in {1..10}; do
    python client.py &
done

从服务器上你可以看到所有的消息都被接收到了:

Total messages recieved: 9998
Total messages recieved: 9999
Total messages recieved: 10000
16

问题在于,当程序退出时,套接字会立刻关闭,并且会被垃圾回收,这样就相当于设置了一个有效的 LINGER 值为 0(也就是说,它会丢弃任何未发送的消息)。对于较大的消息来说,这就成了一个问题,因为发送这些消息所需的时间比套接字被垃圾回收的时间要长。

你可以通过在程序退出前加一个 sleep(0.1) 来解决这个问题(这样可以延迟套接字和上下文被垃圾回收)。

socket.setsockopt(zmq.LINGER, -1)(这是默认设置)应该能避免这个问题,但出于某种原因,它并没有解决,我也没有时间去调查这个原因。

撰写回答