这个模型能在ZeroMQ中实现吗?

2024-04-25 04:55:03 发布

您现在位置:Python中文网/ 问答频道 /正文

Model:

其中A在理论上“起”服务器的作用,其中DE既是订阅者又是发布者,而F是订阅者。你知道吗

举个例子,如果确实可行的话,我们将不胜感激。你知道吗

如果没有,请提供替代方案。WebSocket不是我的目标。你知道吗


Tags: 服务器目标方案理论例子websocket发布者
1条回答
网友
1楼 · 发布于 2024-04-25 04:55:03

当然可以:

最佳
首先阅读[ZeroMQ hierarchy in less than a five seconds]部分中的主要概念差异。你知道吗

为了简单起见,我们假设只有一个tcp://传输类和具有单个IP地址的节点(很容易转换为任何位于同一位置的情况和/或不同的传输类组合)。你知道吗

对于任何较大的卷和/或严格的延迟管理情况,性能调整都是必须的。你知道吗

节点A:

import time, zmq; aCTX = zmq.Context(); aPUB = aCTX.socket( zmq.PUB )
pass;                                   aPUB.bind( "tcp:10.0.0.1:12345" )
pass;                                   aPUB.setsockopt( zmq.CONFLATE, 1 )
pass;                                   print( "A: Started. Can Ctrl+C." )
while True:
      try:
           aPUB.send( "A: sending ...[{0:}]".format( time.ctime() ); time.sleep( 1 )
      except KeyboardInterrupt:
           pass;                        print( "A: Ctrl+C'd. Will terminate" ); break
pass;
aPUB.close()
aCTX.term()

节点-B(,C):

import time, zmq; aCTX = zmq.Context(); aPUB = aCTX.socket( zmq.PUB )
pass;                                   aPUB.bind( "tcp:10.0.0.10:23456" )
pass;                                   aPUB.setsockopt( zmq.CONFLATE, 1 )
pass;                                   aSUB = aCTX.socket( zmq.SUB )
pass;                                   aSUB.connect( "tcp://10.0.0.1:12345" )
pass;                                   aSUB.setsockopt( zmq.LINGER, 0 )
pass;                                   aSUB.setsockopt( zmq.SUBSCRIBE, "A:" )
pass;                                   print( "B: Started. Can Ctrl+C." )
while True:
      try:
           aPUB.send( "B: sending ...[{0:}]".format( time.ctime() );
           if ( 0 != aSUB.poll(  500, zmq.POLLIN ) ):
               print( "B:recv()'d: {0:} at {1:}".format( aSUB.recv( zmq.NOBLOCK ), time.ctime() )

      except KeyboardInterrupt:
           pass;                        print( "B: Ctrl+C'd. Will terminate" ); break
pass;
aPUB.close()
aSUB.close()
aCTX.term()

节点-D(,E):

import time, zmq; aCTX = zmq.Context(); aPUB = aCTX.socket( zmq.PUB )
pass;                                   aPUB.bind( "tcp:10.0.0.100:34567" )
pass;                                   aPUB.setsockopt( zmq.CONFLATE, 1 )
pass;                                   aSUB = aCTX.socket( zmq.SUB )
pass;                                   aSUB.connect( "tcp://10.0.0.10:23456" )
pass;                                   aSUB.connect( "tcp://10.0.0.20:23456" )
pass;                                   aSUB.setsockopt( zmq.LINGER, 0 )
pass;                                   aSUB.setsockopt( zmq.SUBSCRIBE, "B:" )
pass;                                   aSUB.setsockopt( zmq.SUBSCRIBE, "C:" )
pass;                                   print( "D: Started. Can Ctrl+C." )
while True:
      try:
           aPUB.send( "D: sending ...[{0:}]".format( time.ctime() )
           if ( 0 != aSUB.poll(  250, zmq.POLLIN ) ):
               print( "D:recv()'d: {0:} at {1:}".format( aSUB.recv( zmq.NOBLOCK ), time.ctime() )
      except KeyboardInterrupt:
           pass;                        print( "D: Ctrl+C'd. Will terminate" ); break
pass;
aPUB.close()
aSUB.close()
aCTX.term()

节点F:

import time, zmq; aCTX = zmq.Context(); aSUB = aCTX.socket( zmq.SUB )
pass;                                   aSUB.connect( "tcp://10.0.0.100:34567" )
pass;                                   aSUB.connect( "tcp://10.0.0.200:34567" )
pass;                                   aSUB.setsockopt( zmq.LINGER, 0 )
pass;                                   aSUB.setsockopt( zmq.SUBSCRIBE, "D:" )
pass;                                   aSUB.setsockopt( zmq.SUBSCRIBE, "E:" )
pass;                                   print( "F: Started. Can Ctrl+C." )
while True:
      try:
           print( "F:recv()'d: {0:} at {1:}".format( aSUB.recv(), time.ctime() )
      except KeyboardInterrupt:
           pass;                        print( "F: Ctrl+C'd. Will terminate" ); break
pass;
aSUB.close()
aCTX.term()

相关问题 更多 >