使用pika将所有消息从一个队列重新排队到另一个队列
我需要遍历队列中的所有消息,然后关闭回调。我要确保一旦队列为空,就停止处理消息。
所以我正在把一队消息从一个队列写入另一个队列。
creds = pika.PlainCredentials(app.config['mq.user'], app.config['mq.pass'])
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=app.config['mq.host'],
credentials=creds))
connection2 = pika.BlockingConnection(pika.ConnectionParameters(
host=app.config['mq.host'],
credentials=creds))
channel = connection.channel()
channel2 = connection2.channel()
Def requeue_callback(ch, method, properties, body):
try:
msg = json.loads(body)
ch2.basic_publish(exchange='',
routing_key=base_queue+'.request',
body = msg['orig_msg'])
finally:
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(requeue_callback,
queue=base_queue+'.error')
channel.start_consuming()
*另外,我也可以先找出队列中有多少条消息,然后处理这个特定数量的消息。在这种情况下,我该如何将特定数量的消息重新放回队列呢?
1 个回答
0
在你的回调函数里,你可以用 passive=True
来声明一个队列,然后用结果来获取消息的数量。
比如说:
>>> ch = rabbit.connection.channel()
>>> method = ch.queue_declare('sandbox', passive=True)
>>> method.method.message_count
23
>>>
接着检查这个数字是否为0。