多线程当使用RabbitMQ作为java工作队列时,您应该如何处理并发性和瞬时错误?
我正在考虑设置一个RabbitMQ代理来处理java web应用程序的基本任务处理。其基本思想是,生产者是一个web服务器,它希望能够快速处理请求,但也需要进行一些数据处理。为了实现这一点,它知道一些可以序列化为AMQP消息的作业DTO,并且在某个地方有一个消费者知道如何处理这些作业。经典
在阅读了RabbitMQ文档之后,我还有几个问题要问
我希望消费者应用程序充分利用其CPU来处理消息,我想知道RabbitMQ是否已经提供了工作池。假设我需要4个工作线程,那么在同一个连接上注册4个通道,每个通道有1个使用者就足够了吗?在这种情况下,工作将在handleDelivery()中完成,如果成功完成,将发送ack。或者更确切地说,我应该使用1个消费者并在我自己的应用程序层管理一个工作人员池吗
消费者将与数据库对话,这意味着会发生暂时性错误;死锁、乐观锁定冲突、数据库服务器重新启动等等。如果消费者无法处理工作,预期的行为是什么?发出nack并等待代理重新发送作业,或者延迟ack,直到确定作业无法完成为止?还是没关系
值得注意的是,除非数据库正在重新启动或其他操作,否则作业通常不会花费超过几秒钟的时间。我感谢任何指导
# 1 楼答案
1a。如果需要固定数量的工作线程,那么应该打开多个通道,并为每个通道注册一个使用者。RabbitMQ每个通道有一个调度程序线程,因此一个使用者将阻止在同一通道上注册的其他使用者
1b。如果需要动态增长的工作池,那么您可以考虑使用{a1}(假设您在java中实现)。具体来说,SimpleMessageListenerContainer根据工作负载动态管理不断增长和收缩的消费者群。请参阅Spring AMQP参考指南中的"Listeners Concurrency"部分
2a。如果您知道工作人员出现故障,您可以nack,消息将被传递给另一个消费者。除非处于“自动确认”模式,否则在消费者确认之前,该消息不会传递给其他消费者