使用Rabbitmq和Python进行Stomp广播

2 投票
2 回答
4655 浏览
提问于 2025-04-15 12:10

我正在尝试把一个系统从使用 morbid 转移到 rabbitmq,但我发现无法实现 morbid 默认提供的那种广播行为。这里的广播是指,当有消息加入队列时,每个消费者都能收到这条消息。而在 rabbitmq 中,当消息被添加时,它们是以轮询的方式分配给每个监听者的。

有没有人能告诉我怎么才能实现相同的消息分发方式呢?

下面使用的 stomp 库是 http://code.google.com/p/stomppy/

如果用 stomp 无法做到,即使是 amqplib 的示例也会很有帮助。

我现在的代码看起来是这样的:

消费者部分

import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

conn.subscribe(destination='/topic/demoqueue', ack='auto')

while True:
    pass
conn.disconnect()

发送者部分看起来是这样的:

import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

conn.subscribe(destination='/topic/demotopic', ack='auto')

while True:
    pass
conn.disconnect()

2 个回答

3

显然,直接用 STOMP 是不行的;有一个邮件列表的讨论,里面讲了要让广播功能在 STOMP 中正常工作,你需要做的很多复杂的事情(这涉及一些更底层的 AMPQ 内容)。

3

我终于搞明白了怎么做,就是为每个“接收组”创建一个交换机。我不太确定Rabbit在处理成千上万的交换机时表现如何,所以你可能需要在正式使用之前好好测试一下。

在发送代码中:

conn.send(str(i), exchange=exchange, destination='')

空白的目标是必须的,我只关心把消息发送到那个交换机。

接收时:

import stomp
import sys
from amqplib import client_0_8 as amqp
#read in the exchange name so I can set up multiple recievers for different exchanges to tset
exchange = sys.argv[1]
conn = amqp.Connection(host="localhost:5672", userid="username", password="password",
 virtual_host="/", insist=False)

chan = conn.channel()

chan.access_request('/', active=True, write=True, read=True)

#declare my exchange
chan.exchange_declare(exchange, 'topic')
#not passing a queue name means I get a new unique one back
qname,_,_ = chan.queue_declare()
#bind the queue to the exchange
chan.queue_bind(qname, exchange=exchange)

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'browser', 'browser')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="username", password="password")
headers = {}

#subscribe to the queue
conn.subscribe(destination=qname, ack='auto')

while True:
    pass
conn.disconnect()

撰写回答