如何在RabbitMQ服务器上设置超时检测?

17 投票
3 回答
13569 浏览
提问于 2025-04-15 13:55

我正在尝试使用RabbitMQ这个Python库。

我注意到一个问题,就是如果我强制关闭一个消费者(就像程序崩溃一样),服务器会认为这个消费者还在,持续很长时间。结果就是每隔一条消息就会被忽略。

举个例子,如果你关闭一个消费者然后重新连接,那么一半的消息会被忽略。如果你再关闭一个消费者,那么三分之二的消息会被忽略。如果你再关闭第三个消费者,那么四分之三的消息会被忽略,依此类推。

我尝试过开启确认机制,但似乎没有什么帮助。我找到的唯一解决办法就是手动停止服务器并重置它。

有没有更好的方法呢?

如何重现这个场景

  • 运行RabbitMQ。

  • 解压这个库

  • 这里下载消费者和发布者的代码。运行amqp_consumer.py两次。运行amqp_publisher.py,输入一些数据,观察它是否按预期工作。消息会以轮询的方式接收。

  • 用kill -9或者任务管理器关闭其中一个消费者进程。

  • 现在当你发布一条消息时,50%的消息会丢失。

3 个回答

2

请提供一些关于你声明的组件的具体信息。通常来说(不管客户端是怎么实现的),一个具有以下特性的队列:

  • 独占的(exclusive)和
  • 自动删除的(auto-delete)

应该在声明的客户端和消息代理(broker)之间的连接断开后立即被移除。不过,这对共享队列来说就没什么帮助了。请详细说明一下你到底想要模拟什么。

5

RabbitMQ 在确认消息处理时并没有设置超时时间,也就是说,客户端在处理完消息后,不需要在一定时间内给出确认。你可以查看这篇帖子,里面的讨论可能会对你有帮助。以下是帖子中的一些重点:

AMQP 的确认机制对于订阅和“拉取”是一样的。在这两种情况下,消息会保留在服务器上,但在确认之前,其他消费者无法获取这条消息。确认的方式有三种:要么被确认(ack),这样消息就会被移除;要么被拒绝(nack),不过 RabbitMQ 并没有实现这个功能;或者是通道/连接关闭,这时消息就会变得可供其他消费者使用。

还有(我强调的部分)

在等待确认的过程中没有超时限制。通常这不是问题,因为常见的缺少确认的情况,比如网络问题或客户端故障,都会导致连接断开(从而触发上面提到的行为)。不过,设置一个超时可能会有用,比如用来处理活着但没有响应的消费者。这个问题之前也讨论过。你有没有具体的使用场景需要这样的功能呢?

这个问题可能是因为在客户端拉取模式下,服务器更难检测到连接是否断开(与活着但没有响应的消费者相比),特别是服务器似乎愿意一直等待确认。

更新:在 Linux 上,你可以为 SIGTERM 和/或 SIGKILL 和/或 SIGINT 附加信号处理程序,这样可以希望从客户端有序地关闭连接。在 Windows 上,我认为从任务管理器关闭会调用 Win32 的 TerminateProcess API,关于这个,MSDN 说:

如果一个进程是通过 TerminateProcess 终止的,所有线程会立即被终止,无法执行额外的代码。这意味着线程不会执行终止处理程序中的代码。此外,没有附加的 DLL 会被通知进程正在分离。

这意味着可能很难捕捉到终止信号并有序地关闭连接。

如果你有自己的使用场景,可以在 RabbitMQ 的讨论列表上提出来,看看是否值得讨论确认超时的问题。

11

我在这个压缩包里没看到 amqp_consumer.pyamqp_producer.py,所以要重现这个问题有点难。

RabbitMQ 会在操作系统告诉它某个连接关闭时,终止连接并释放那些未确认的消息,以便重新发送给其他客户端。你的情况很奇怪,因为即使是用 kill -9 命令,也应该能正常清理 TCP 连接。

有些人发现,当 AMQP 客户端和服务器之间有防火墙或 NAT 设备时,连接的存活时间比预期的要长。这可能是你遇到的问题吗?还是说你所有的东西都在本地运行?另外,你的系统各个部分运行在哪个操作系统上呢?

补充说明:从你下面的评论来看,我猜测你在 Linux 上运行服务器,而客户端可能是在 Windows 上。如果是这样的话,可能是 Windows 的 TCP 驱动没有正确关闭连接,这和 Unix 上的 kill -9 行为是不同的。(在 Unix 上,内核会正确关闭任何被杀掉的进程的 TCP 连接。)

如果真是这样,那么坏消息是 RabbitMQ 只能在连接关闭时释放资源,所以如果客户端操作系统不这样做,它就无能为力。这和几乎所有其他基于 TCP 的服务都是一样的。

不过好消息是,AMQP 支持一种“心跳”选项,正好适用于这种网络不可靠的情况。你可以尝试启用心跳功能。当启用后,如果服务器在设定的时间内没有收到任何流量,它就会认为连接已经断开。

然而坏消息是,我觉得目前 py-amqplib 不支持心跳功能。不过试试看也是值得的!

撰写回答