如何在zmq的推/拉模式中设置hwm?
我发现了一个类似的问题,ZeroMQ: PUSH的HWM不起作用,但这个问题没有解决我的困扰。
我想控制推送套接字排队的消息数量,但它还是会排队1000条消息。
所以我想知道如何设置推送套接字的高水位线(hwm)。提前谢谢你们。
我的环境是:libzmq 4.0.4,pyzmq 14.1.0,python 3.3
这是我的代码:
server.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import random
import zmq
class TestPush(object):
def __init__(self):
self.ctx = zmq.Context()
random.seed()
def run(self):
task_snd = self.ctx.socket(zmq.PUSH)
task_snd.setsockopt(zmq.SNDHWM, 10)
task_snd.bind('tcp://*:53000')
while True:
workload = str(random.randint(1, 100))
task_snd.send(workload.encode('utf-8'))
print('Send {0}'.format(workload))
if __name__ == '__main__':
test_push = TestPush()
test_push.run()
client.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import random
import zmq
class TestPull(object):
def __init__(self):
self.ctx = zmq.Context()
def run(self):
task_rcv = self.ctx.socket(zmq.PULL)
task_rcv.setsockopt(zmq.RCVHWM, 1)
task_rcv.connect('tcp://localhost:53000')
while True:
msg = task_rcv.recv()
print('Receive msg: {0}'.format(msg))
time.sleep(random.randint(2, 3))
if __name__ == '__main__':
test_pull = TestPull()
test_pull.run()
1 个回答
我遇到了一个和ZeroMQ有关的类似问题,当我尝试在推送和拉取的套接字上设置HWM(高水位标记)时,甚至在发布和订阅的套接字上也不管用。我想解释一下发生了什么,以及我是怎么解决这个问题的。
我写了两个脚本,第一个是发送者,使用推送套接字,第二个是接收者,使用拉取套接字。我在两个套接字上都设置了HWM为10。在接收者的脚本中,我在每次收到消息后设置了5秒的延迟。然后我运行了发送者的脚本,循环发送100条消息(在接收者保持运行以接收消息的情况下)。
我期待的结果:
接收者的队列和发送者的队列都会达到高水位标记。之后,发送者会停止发送更多的消息。
但实际发生了什么:
发送者发送了所有100条消息后就退出了。但是接收者却持续处理消息,直到收到所有的消息,花了很长时间。
经过研究,我找到了原因:
有一个叫做内核套接字缓冲区的东西,它位于发送者套接字和接收者套接字之间。每当一个进程打开一个套接字时,内核会为TCP套接字分配内存空间,默认是128KB。这个内核套接字缓冲区适用于发送者和接收者的套接字(所以总的缓冲区就是128KB + 128KB)。我的消息大小是以字节为单位的(一个带有一些字符的整数)。因此,总的消息缓冲区如下:
总的消息缓冲区 = 发送者套接字的高水位标记 + 发送者套接字的内核缓冲区(128KB) + 接收者套接字的高水位标记 + 接收者套接字的内核缓冲区(128KB)
解决方案:
现在,我把消息长度改为稍微超过1KB。然后再次进行测试,发现大约发送了260条消息(如预期),之后发送者会在接收者接收到一些消息后暂停,再继续发送。
附加信息
为了让推送套接字在接收者无法接收时仍然可以继续发送消息,我们可以在发送过程中使用NOBLOCK选项,但这样接收者丢失的消息数量会大幅增加。因此,更好的选择是使用轮询或超时,然后再用NOBLOCK选项调用发送过程。
请注意,你可以使用zeromq的SNDBUFF/RCVBUFF,但操作系统可能不会遵循这个设置(就像我遇到的情况,它并没有生效)。