在Pika中获取队列大小(AMQP Python)
这是个简单的问题,不过在谷歌或者Pika的开源代码里没有找到答案。请问有没有办法查询Pika当前队列的大小(也就是里面有多少个项目)?
6 个回答
这里是如何使用pika来获取队列长度的方法(假设你在本地使用默认的用户名和密码)。请把q_name替换成你的队列名称。
import pika
connection = pika.BlockingConnection()
channel = connection.channel()
q = channel.queue_declare(q_name)
q_len = q.method.message_count
我知道这个问题有点老了,但这里有一个用pika实现的例子。
关于AMQP和RabbitMQ,如果你已经创建了一个队列,你可以用一个叫做passive标志重新声明这个队列,同时保持其他队列参数不变。这个声明的响应declare-ok会告诉你队列里有多少消息。
下面是一个使用pika 0.9.5的例子:
import pika
def on_callback(msg):
print msg
params = pika.ConnectionParameters(
host='localhost',
port=5672,
credentials=pika.credentials.PlainCredentials('guest', 'guest'),
)
# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection(parameters=params)
# Open the channel
channel = connection.channel()
# Declare the queue
channel.queue_declare(
callback=on_callback,
queue="test",
durable=True,
exclusive=False,
auto_delete=False
)
# ...
# Re-declare the queue with passive flag
res = channel.queue_declare(
callback=on_callback,
queue="test",
durable=True,
exclusive=False,
auto_delete=False,
passive=True
)
print 'Messages in queue %d' % res.method.message_count
这段代码会打印出以下内容:
<Method(['frame_type=1', 'channel_number=1', "method=<Queue.DeclareOk(['queue=test', 'message_count=0', 'consumer_count=0'])>"])>
<Method(['frame_type=1', 'channel_number=1', "method=<Queue.DeclareOk(['queue=test', 'message_count=0', 'consumer_count=0'])>"])>
Messages in queue 0
你可以从message_count
这个成员中获取消息的数量。
在AMQP协议中,有两种方法可以获取队列的大小。你可以使用Queue.Declare或者Basic.Get。
如果你是通过Basic.Consume来实时消费消息,那么你就无法获取这个信息,除非你断开连接(超时)然后重新声明队列,或者获取一条消息但不确认(ack)。在AMQP的新版本中,你可以主动将消息重新放回队列。
至于Pika,我不太清楚具体情况,但Python的AMQP客户端一直让我感到困扰。通常你需要对类进行一些修改,才能获取你需要的信息,或者让队列消费者超时,这样你就可以在定期的间隔内做其他事情,比如记录统计数据或查看队列中有多少消息。
解决这个问题的另一种方法是放弃,使用Pipe类来运行 sudo rabbitmqctl list_queues -p my_vhost
。然后解析输出,找出所有队列的大小。如果你这样做,你需要配置 /etc/sudoers
,以便不询问通常的sudo密码。
我希望有其他对Pika更有经验的人能回答这个问题,告诉你如何完成我提到的所有事情,那样我就可以下载Pika试试看了。但如果没有人回应,而你在对Pika代码进行修改时遇到困难,那么可以看看 haigha
。我发现他们的代码比其他Python AMQP客户端库更简单,因为他们更贴近AMQP协议。