如何让Rabbit MQ队列在存储转发模式下运行?

1 投票
1 回答
1309 浏览
提问于 2025-04-18 17:32

我正在使用Python的Pika客户端实验Rabbit MQ。我想让我的AMQP发送器能够在“存储和转发”模式下工作,也就是说,当服务器或网络出现故障时,可以开始将消息排队,等到恢复后再可靠地发送这些消息。请问我该怎么做?下面是我的amqp-sender.py代码:



    import pika
    import psutil
    import time
    import datetime
    import log
    import json
    import logging
    import uuid
    from dateutil.tz import tzlocal

    logging.basicConfig()
    logger = log.setup_custom_logger('amqp_send', 'amqp_send.log')

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='54.191.161.213'))
    channel = connection.channel()
    channel.confirm_delivery()

    channel.queue_declare(queue='ems.data')

    def get_mac_address():
        return ':'.join(['{:02x}'.format((uuid.getnode() >> i) & 0xff) for i in range(0,8*6,8)][::-1])

    while True:
        now = datetime.datetime.now(tzlocal())
        timestamp = now.strftime('%Y-%m-%d %H:%M:%S.%f %z')
        data = {
            'timestamp':timestamp,
            'systemId':get_mac_address(),
            'cpuPct':psutil.cpu_percent(),
            'memoryUsed':psutil.virtual_memory().used
        }
        msg=json.dumps(data)
        delivered=channel.basic_publish(exchange='', routing_key='abc', body=msg, mandatory=True)
        if delivered:
            logger.info("delivered %s" % msg)
        else:
            logger.error('failed to deliver %s' % msg)
        time.sleep(1)

    connection.close()

1 个回答

2

你需要使用 channel.confirm_delivery() 并设置 mandatory=True

confirm_delivery 会返回一个布尔值,告诉你消息是否被 Rabbit 正确处理。

mandatory 标志:

这个标志告诉服务器,如果消息无法被路由到队列,该怎么处理。具体来说,如果设置了 mandatory,并且在运行绑定后消息没有放到任何队列上,那么消息会被返回给发送者(通过基本的返回机制)。如果在同样的情况下没有设置 mandatory,服务器就会默默地丢掉这条消息。

所以你会得到类似这样的代码:

channel.confirm_delivery()
delivered = channel.basic_publish(exchange='', routing_key='ems.data', body=msg, mandatory=True)
if not delivered:
    # store message for later reprocessing

撰写回答