有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

JavaSpringKafka与springintegration的并发性

我不知道我是否遗漏了一些明显的东西,或者spring-integration-kafka:3.0.1中有一个bug,因为它试图让多个消费者为一个主题运行。该场景是一个带有10个分区的Kafka主题,以及一个侦听该主题的springboot应用程序。相关配置为:

应用程序。yml:

spring:
  kafka:
    consumer:
      group-id: test-consumer
      auto-offset-reset: earliest
    listener:
      concurrency: 4

配置:

@Configuration
@EnableIntegration
@IntegrationComponentScan("com.test")
public class MessageConfig {
    @Bean
    public MessageChannel testReceiveChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow testReceiveFlow(@Qualifier("kafkaConsumerFactory") final ConsumerFactory<?, ?> kafkaConsumer, final MessageChannel testReceiveChannel) {
        return IntegrationFlows
                .from(Kafka.messageDrivenChannelAdapter(kafkaConsumer, ListenerMode.record, "test-topic"))
                .transform(new JsonToObjectTransformer(EventMessage.class))
                .channel(testReceiveChannel)
                .get();
    }
}

听众:

@Component
public class EventListener {
    private static final Logger LOG = LoggerFactory.getLogger(EventListener.class);

    @ServiceActivator(inputChannel = "testReceiveChannel")
    public void processMessage(final EventMessage message) {
        LOG.info("Got message {} on {}", message.getValue(), Thread.currentThread().getName());
    }
}

开始时,我只得到一个容器在所有10个分区上侦听。我可以看到ConcurrentKafkaListenerContainerFactory上设置了正确的并发值,但似乎从未调用过它的initializeContainer方法(如果我理解正确,这将应用于实际的使用者)。然而,我可能是在看一件完全错误的事情

知道我忽略了什么吗


共 (1) 个答案

  1. # 1 楼答案

    Spring启动KafkaProperties(例如spring.kafka.listener.concurrency = 4)和所提到的ConcurrentKafkaListenerContainerFactory应用于@KafkaListener组件。这与Spring集成无关。至少是自动的

    您需要手动执行此操作:

    Kafka.messageDrivenChannelAdapter(kafkaConsumer, ListenerMode.record, "test-topic")
          .configureListenerContainer(c ->
                            c.concurrency(this.kafkaProperties.getListener().getConcurrency()))