我试图对在PUB/SUB
套接字方案中实现的ZeroMQ publish-subscribe原型的性能进行基准测试,但有一个相当大的问题。我会尽快发布消息,然后尽快订阅和处理。在
出版商:
import zmq
import time
MAX = 100000
context = zmq.Context()
pub_sock = context.socket(zmq.PUB)
pub_sock.bind("ipc:///home/pub")
time.sleep(1)
var1 = 1
starttime = time.time()
while var1 != MAX+1:
pub_sock.send_string('Msg: dummy message')
var1 += 1
totaltime = time.time() - starttime
print "total time: %s" % totaltime
print "Message/second: %s" % float(MAX/totaltime)
订户:
^{pr2}$问题是有时发布者比订阅者快得多,队列将达到最高水位线,我将丢失消息。如果发生这种情况,订户环路将永远不会退出,然后我将需要使用键盘中断来结束它,并找出收到的消息数。在
我不想增加HWM,因为默认值1000也是我希望在实际软件中使用的,因此在基准测试期间,我更希望将其保持在1000。在
我已经看过ZMQ的内置性能测试脚本(local_thr
,remote_thr
脚本在perf
文件夹中),但它们是用C编写的(I want python script performance test),而且它们还使用PUSH-PULL
套接字,而不是PUB-SUB
套接字。一旦到达HWM,PUSH-PULL
套接字就会阻塞,因此ZMQ性能测试脚本不会遇到由于丢弃而丢失消息的问题。它只是减慢了过程。在
我希望有类似的行为,但是使用PUB-SUB
套接字。在
显然,ZeroMQ中没有规定在达到HWM时将PUB-SUB
的行为更改为阻塞。在
是否有其他方法实现流量控制?
一个相关的问题:
如果我运行一个发布者和一个订阅者,消息很少会丢失。
当我运行1个发布服务器和3个订阅服务器(所有订阅服务器订阅同一个主题)时,大约10%的消息丢失(在所有订阅服务器上)。我试图通过在循环中引入伪语句来限制发布者,为了在所有订阅者中实现不丢失消息,我不得不减少将近1/3。Is this normal?
看起来消息一次只传递给一个订阅者(而不是并行传递),因此花费了三倍的时间。在
谢谢。在
目前没有回答
相关问题 更多 >
编程相关推荐