多个工作者消费RabbitMQ队列中的相同消息

1 投票
1 回答
4293 浏览
提问于 2025-04-18 03:36

我使用的是py-amqp模块和Python 3.4。当我运行多个监听器,并启动一个生产者来发布消息时,这些监听器会同时接收到一条消息并开始处理。我并不希望出现这种情况,因为消息应该只写入数据库一次。所以,最快的工作者会把消息写入数据库,而其他工作者则会说这条消息已经存在。

生产者:

import json
import amqp
import random
from application.settings import RMQ_PASSWORD, RMQ_USER, RMQ_HOST, RMQ_EXCHANGE

def main():
    conn = amqp.Connection(RMQ_HOST, RMQ_USER,
                           RMQ_PASSWORD, ssl=False)
    ch = conn.channel()
    ch.exchange_declare(RMQ_EXCHANGE, 'fanout')
    req = {"request": {"transaction_number": random.randint(100000, 9999999999)}}
    message = json.dumps(req)    
    msg = amqp.Message(message)    
    ch.basic_publish(msg, RMQ_EXCHANGE)    
    ch.close()
    conn.close()

if __name__ == '__main__':
    for x in range(100):
        main()

工作者:

from functools import
from pipeline import pipeline, dal
from settings import DB_CONNECTION_STRING, RMQ_EXCHANGE, RMQ_HOST, RMQ_PASSWORD, RMQ_USER    
import amqp


DB = dal.DAL(DB_CONNECTION_STRING)
message_processor = pipeline.Pipeline(DB)


def callback(channel, msg):
    channel.basic_ack(msg.delivery_tag)
    message_processor.process(msg)

    if msg.body == 'quit':
        channel.basic_cancel(msg.consumer_tag)


def main():
    conn = amqp.Connection(RMQ_HOST, RMQ_USER,
                           RMQ_PASSWORD, ssl=False)
    ch = conn.channel()
    ch.exchange_declare(RMQ_EXCHANGE, 'fanout')
    qname, _, _ = ch.queue_declare()
    ch.queue_bind(qname, RMQ_EXCHANGE)
    ch.basic_consume(qname, callback=partial(callback, ch))
    while ch.callbacks:
        ch.wait()
    ch.close()
    conn.close()

if __name__ == '__main__':
    print('Listener starting')
    main()

还有:

user@RabbitMQ:~$ sudo rabbitmqctl list_bindings
Listing bindings ...
        exchange        amq.gen--crTjfeSlue6gw0LRwW7pQ  queue   amq.gen--crTjfeSlue6gw0LRwW7pQ  []
        exchange        amq.gen-1X3vwGF5OKn_gcnofpJKFg  queue   amq.gen-1X3vwGF5OKn_gcnofpJKFg  []
...
        exchange        amq.gen-yf8ieG1AK9x83Vz4GBj-ZA  queue   amq.gen-yf8ieG1AK9x83Vz4GBj-ZA  []
        exchange        entryapi.test   queue   entryapi.test   []
entryapi        exchange        entryapi.test   queue           []
azaza   exchange        amq.gen--crTjfeSlue6gw0LRwW7pQ  queue           []
azaza   exchange        amq.gen-1X3vwGF5OKn_gcnofpJKFg  queue           []
...
azaza   exchange        amq.gen-yf8ieG1AK9x83Vz4GBj-ZA  queue           []
azaza   exchange        entryapi.test   queue           []
...done.

1 个回答

2

我觉得你用的设置不太适合你的需求。你有一个发布者在往一个交换机(exchange)发送消息,而你想把这些消息读出来并写入数据库。你希望有很多消费者同时写入数据库,以提高处理速度。使用“扇出交换机”(fanout exchanges)会把消息复制,这样多个队列和消费者就会导致同样的数据多次写入数据库。你需要使用“工作队列”(Work Queues)。每个交换机可以是默认的(没有类型,或者是直接交换机,所有消息都使用相同的路由键)交换机。所有发送到交换机的消息都会被导向一个单独的队列。每个队列会有多个消费者。每条消息只会被你的消费者组中的一个消费者读取一次,然后只会写入数据库一次。

想了解更多,可以查看这里 http://www.rabbitmq.com/tutorials/tutorial-two-python.html

撰写回答