有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何使用Quarkus在卡夫卡的同一主题中设置多个使用者

我正在使用Quarkus框架构建一个Kafka消费者,它将阅读一个包含3个分区的主题。下面的代码片段可以正常工作,但根据日志,我只是启动了一个消费者和三个分区。我现在的问题是,一旦我运行我的应用程序,我如何才能产生3个消费者

@Incoming("topic-1")
public CompletionStage<Void> onMessage(KafkaRecord<String, String> message) throws IOException {

    LOG.info("Kafka order message with value = {} arrived from topic {} ", message.getPayload(),
            message.getTopic());

    //JsonObject event = new JsonObject(message.getPayload());

    try {
        if (true) {
            LOG.info("Kafka message: " + message);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }

    return message.ack();
}

请参阅示例日志:

INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=testconsumer, groupId=kafka-detection-consumer] Finished assignment for group at generation 64: {testconsumer-bf6d314c-44e1-47b1-9439-fe4058951841=Assignment(partitions=[test_part-0, test_part-1, test_part-2])}

enter image description here


共 (0) 个答案