有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java spring rabbitmq获取对扇出消息的所有回复

中的以下类包含在多个消费者应用程序中:

@Component
@Configuration
public class HealthListener {

    public static final String HEALTH_CHECK_QUEUE_NAME = "healthCheckQueue";
    public static final String HEALTH_CHECK_FANOUT_EXCHANGE_NAME = "health-check-fanout";


    @Bean
    public Binding healthListenerBinding(
            @Qualifier("healthCheckQueue") Queue queue,
            @Qualifier("instanceFanoutExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean
    public FanoutExchange instanceFanoutExchange() {
        return new FanoutExchange(HEALTH_CHECK_FANOUT_EXCHANGE_NAME, true, false);
    }

    @Bean
    public Queue healthCheckQueue() {
        return new Queue(HEALTH_CHECK_QUEUE_NAME);
    }

    @RabbitListener(queues = HEALTH_CHECK_QUEUE_NAME)
    public String healthCheck() {
        return "some result";
    }

}

我正在尝试向fanout exchange发送消息,并接收所有回复,以了解哪些消费者正在运行

我可以发送一条消息并得到如下第一个回复:

@Autowired
RabbitTemplate template;

// ...
String firstReply = template.convertSendAndReceiveAsType("health-check-fanout", "", "", ParameterizedTypeReference.forType(String.class));

然而,我需要得到所有的回复,而不仅仅是第一条。我需要设置一个回复侦听器,但我不确定如何设置


共 (1) 个答案

  1. # 1 楼答案

    (convertS|s)endAndReceive.*()方法的设计不是为了处理多个回复;严格来说,它们是一个请求/一个回复的方法

    您需要使用(convertAndS|s)end()方法来发送请求,并实现自己的应答机制,可能需要使用应答的侦听器容器,以及一些组件来聚合应答

    您可以使用类似于Spring集成聚合器的东西来实现这一点,但您需要某种机制(ReleaseStrategy),可以知道何时收到所有预期的回复

    或者,你可以简单地接收离散的回复,并单独处理它们

    编辑

    @SpringBootApplication
    public class So54207780Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So54207780Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            return args -> template.convertAndSend("fanout", "", "foo", m -> {
                m.getMessageProperties().setReplyTo("replies");
                return m;
            });
        }
    
        @RabbitListener(queues = "queue1")
        public String listen1(String in) {
            return in.toUpperCase();
        }
    
        @RabbitListener(queues = "queue2")
        public String listen2(String in) {
            return in + in;
        }
    
        @RabbitListener(queues = "replies")
        public void replyHandler(String reply) {
            System.out.println(reply);
        }
    
        @Bean
        public FanoutExchange fanout() {
            return new FanoutExchange("fanout");
        }
    
        @Bean
        public Queue queue1() {
            return new Queue("queue1");
        }
    
        @Bean
        public Binding binding1() {
            return BindingBuilder.bind(queue1()).to(fanout());
        }
    
        @Bean
        public Queue queue2() {
            return new Queue("queue2");
        }
    
        @Bean
        public Binding binding2() {
            return BindingBuilder.bind(queue2()).to(fanout());
        }
    
        @Bean
        public Queue replies() {
            return new Queue("replies");
        }
    
    }
    

    FOO
    foofoo