使用pika将所有消息从一个队列重新排队到另一个队列

0 投票
1 回答
1642 浏览
提问于 2025-04-19 21:22

我需要遍历队列中的所有消息,然后关闭回调。我要确保一旦队列为空,就停止处理消息。

所以我正在把一队消息从一个队列写入另一个队列。

creds = pika.PlainCredentials(app.config['mq.user'], app.config['mq.pass']) connection = pika.BlockingConnection(pika.ConnectionParameters( host=app.config['mq.host'], credentials=creds)) connection2 = pika.BlockingConnection(pika.ConnectionParameters( host=app.config['mq.host'], credentials=creds))

    channel = connection.channel()
    channel2 = connection2.channel()

Def requeue_callback(ch, method, properties, body):
try:
    msg = json.loads(body)
    ch2.basic_publish(exchange='',
                      routing_key=base_queue+'.request',
                      body = msg['orig_msg'])
finally:
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(requeue_callback,
                  queue=base_queue+'.error')

channel.start_consuming()

*另外,我也可以先找出队列中有多少条消息,然后处理这个特定数量的消息。在这种情况下,我该如何将特定数量的消息重新放回队列呢?

1 个回答

0

在你的回调函数里,你可以用 passive=True 来声明一个队列,然后用结果来获取消息的数量。

比如说:

>>> ch = rabbit.connection.channel()
>>> method = ch.queue_declare('sandbox', passive=True)
>>> method.method.message_count
23
>>> 

接着检查这个数字是否为0。

撰写回答