等待单个RabbitMQ消息并设置超时

12 投票
5 回答
17042 浏览
提问于 2025-04-15 22:32

我想给一个RabbitMQ服务器发送一条消息,然后等着接收一条回复消息(在一个叫“reply-to”的队列里)。当然,我不想一直等下去,以防处理这些消息的应用程序出现问题——所以需要设置一个超时时间。这听起来是个很简单的任务,但我找不到办法来实现。现在我在使用py-amqplibRabbitMQ .NET客户端时都遇到了这个问题。

到目前为止,我找到的最好解决方案是使用basic_get进行轮询,中间加上sleep,但这样做看起来很糟糕:

def _wait_for_message_with_timeout(channel, queue_name, timeout):
    slept = 0
    sleep_interval = 0.1

    while slept < timeout:
        reply = channel.basic_get(queue_name)
        if reply is not None:
            return reply

        time.sleep(sleep_interval)
        slept += sleep_interval

    raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)

难道没有更好的办法吗?

5 个回答

2

这里有一个例子,使用了qpid,可以参考这个链接。在这个例子中,有一行代码是msg = q.get(timeout=1),应该能满足你的需求。抱歉,我不太清楚还有哪些AMQP客户端库支持超时功能,特别是你提到的那两个具体的库。

10

我在.NET客户端中最终做了这些:

protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs)
{
    var consumer = new QueueingBasicConsumer(Channel);
    var tag = Channel.BasicConsume(queueName, true, null, consumer);
    try
    {
        object result;
        if (!consumer.Queue.Dequeue(timeoutMs, out result))
            throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs / 1000.0));

        return ((BasicDeliverEventArgs)result).Body;
    }
    finally
    {
        Channel.BasicCancel(tag);
    }
}

不过,我在使用py-amqplib时就没办法这样做,因为它的basic_consume方法不会自动调用回调函数,除非你手动调用channel.wait(),而且channel.wait()又不支持超时!这个让人头疼的限制(我一直在碰到)意味着,如果你再也收不到消息,你的线程就会一直卡在那里。

9

我刚刚为 amqplibcarrot 中添加了超时支持。

这是 amqplib.client0_8.Connection 的一个子类:

http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97

wait_multichannel.wait 的一个版本,可以在任意数量的通道上接收数据。

我想这可能在某个时候会合并到主项目中。

撰写回答