有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java无法使用kafka管理客户端API创建具有所需分区的kafka主题

我正在使用Kafka管理客户端API创建主题。正在创建主题,但是默认情况下,主题是使用1个分区创建的。API不遵守提供的可配置值。不确定我是否正确使用了它

注意:主题创建是在代理级别启用的。主题也正在创建中,但它是使用分区1创建的

NewTopic newTopic = new NewTopic(TOPIC_NAME, 10, (short) 1);
        CreateTopicsResult createTopicsResult = null;
        try {
            createTopicsResult = KafkaAdminClient.create(getAdminProperties()).createTopics(Collections.singletonList(newTopic));
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

但是,我能够使用Kafka管理客户端API增加先前创建的主题的分区


共 (1) 个答案

  1. # 1 楼答案

    我试图用以下代码重现这一点,但没有成功:

    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.admin.CreateTopicsResult;
    import org.apache.kafka.clients.admin.KafkaAdminClient;
    import org.apache.kafka.clients.admin.NewTopic;
    
    import java.util.Collections;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class AdminApiDemo {
    
        private static final String BOOTSRAP_SERVER = "localhost:9092";
        private static final String TOPIC_NAME = "demoTopic";
        private static final int NUM_PARTITIONS = 3;
        private static final short NUM_REPLICAS = 1;
    
        private final AdminClient adminClient;
    
        private AdminApiDemo(Properties properties) {
            this.adminClient = KafkaAdminClient.create(properties);
        }
    
        public static void main(String[] args) {
            final Properties properties = new Properties();
            properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSRAP_SERVER);
    
            new AdminApiDemo(properties).createTopic(TOPIC_NAME, NUM_PARTITIONS, NUM_REPLICAS);
        }
    
        private void createTopic(String topicName, int numPartitions, short numReplicas) {
            try {
                final NewTopic newTopic = new NewTopic(topicName, numPartitions, numReplicas);
                final CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));
                result.values().get(topicName).get(5, TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    

    A kafka-topics describe显示如下:

    root@kafka:/# kafka-topics  bootstrap-server localhost:9092  describe  topic demoTopic
    Topic:demoTopic PartitionCount:3    ReplicationFactor:1 Configs:
        Topic: demoTopic    Partition: 0    Leader: 1   Replicas: 1 Isr: 1
        Topic: demoTopic    Partition: 1    Leader: 1   Replicas: 1 Isr: 1
        Topic: demoTopic    Partition: 2    Leader: 1   Replicas: 1 Isr: 1
    

    我想,好吧,如果这个话题可能在创作之前就存在,那该怎么办,但我又得到了一个java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'demoTopic' already exists.,所以这也不可能是你的情况

    我知道这不是“真正的”答案,它解决了任何问题,对此我很抱歉。但我希望它能有所帮助。也许其他人可以用它在他的环境中重现,并“看到”问题