如何在RabbitMQ服务器上设置超时检测?
我注意到一个问题,就是如果我强制关闭一个消费者(就像程序崩溃一样),服务器会认为这个消费者还在,持续很长时间。结果就是每隔一条消息就会被忽略。
举个例子,如果你关闭一个消费者然后重新连接,那么一半的消息会被忽略。如果你再关闭一个消费者,那么三分之二的消息会被忽略。如果你再关闭第三个消费者,那么四分之三的消息会被忽略,依此类推。
我尝试过开启确认机制,但似乎没有什么帮助。我找到的唯一解决办法就是手动停止服务器并重置它。
有没有更好的方法呢?
如何重现这个场景
3 个回答
请提供一些关于你声明的组件的具体信息。通常来说(不管客户端是怎么实现的),一个具有以下特性的队列:
- 独占的(exclusive)和
- 自动删除的(auto-delete)
应该在声明的客户端和消息代理(broker)之间的连接断开后立即被移除。不过,这对共享队列来说就没什么帮助了。请详细说明一下你到底想要模拟什么。
RabbitMQ 在确认消息处理时并没有设置超时时间,也就是说,客户端在处理完消息后,不需要在一定时间内给出确认。你可以查看这篇帖子,里面的讨论可能会对你有帮助。以下是帖子中的一些重点:
AMQP 的确认机制对于订阅和“拉取”是一样的。在这两种情况下,消息会保留在服务器上,但在确认之前,其他消费者无法获取这条消息。确认的方式有三种:要么被确认(ack),这样消息就会被移除;要么被拒绝(nack),不过 RabbitMQ 并没有实现这个功能;或者是通道/连接关闭,这时消息就会变得可供其他消费者使用。
还有(我强调的部分)
在等待确认的过程中没有超时限制。通常这不是问题,因为常见的缺少确认的情况,比如网络问题或客户端故障,都会导致连接断开(从而触发上面提到的行为)。不过,设置一个超时可能会有用,比如用来处理活着但没有响应的消费者。这个问题之前也讨论过。你有没有具体的使用场景需要这样的功能呢?
这个问题可能是因为在客户端拉取模式下,服务器更难检测到连接是否断开(与活着但没有响应的消费者相比),特别是服务器似乎愿意一直等待确认。
更新:在 Linux 上,你可以为 SIGTERM 和/或 SIGKILL 和/或 SIGINT 附加信号处理程序,这样可以希望从客户端有序地关闭连接。在 Windows 上,我认为从任务管理器关闭会调用 Win32 的 TerminateProcess
API,关于这个,MSDN 说:
如果一个进程是通过
TerminateProcess
终止的,所有线程会立即被终止,无法执行额外的代码。这意味着线程不会执行终止处理程序中的代码。此外,没有附加的 DLL 会被通知进程正在分离。
这意味着可能很难捕捉到终止信号并有序地关闭连接。
如果你有自己的使用场景,可以在 RabbitMQ 的讨论列表上提出来,看看是否值得讨论确认超时的问题。
我在这个压缩包里没看到 amqp_consumer.py
或 amqp_producer.py
,所以要重现这个问题有点难。
RabbitMQ 会在操作系统告诉它某个连接关闭时,终止连接并释放那些未确认的消息,以便重新发送给其他客户端。你的情况很奇怪,因为即使是用 kill -9
命令,也应该能正常清理 TCP 连接。
有些人发现,当 AMQP 客户端和服务器之间有防火墙或 NAT 设备时,连接的存活时间比预期的要长。这可能是你遇到的问题吗?还是说你所有的东西都在本地运行?另外,你的系统各个部分运行在哪个操作系统上呢?
补充说明:从你下面的评论来看,我猜测你在 Linux 上运行服务器,而客户端可能是在 Windows 上。如果是这样的话,可能是 Windows 的 TCP 驱动没有正确关闭连接,这和 Unix 上的 kill -9
行为是不同的。(在 Unix 上,内核会正确关闭任何被杀掉的进程的 TCP 连接。)
如果真是这样,那么坏消息是 RabbitMQ 只能在连接关闭时释放资源,所以如果客户端操作系统不这样做,它就无能为力。这和几乎所有其他基于 TCP 的服务都是一样的。
不过好消息是,AMQP 支持一种“心跳”选项,正好适用于这种网络不可靠的情况。你可以尝试启用心跳功能。当启用后,如果服务器在设定的时间内没有收到任何流量,它就会认为连接已经断开。
然而坏消息是,我觉得目前 py-amqplib 不支持心跳功能。不过试试看也是值得的!