有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java为什么卡夫卡消费者不能产生结果?

作为卡夫卡学习练习,我编写了一个Java程序TsdbMetricToKafkaTopic将数据从openTSDB复制到卡夫卡主题,并编写了另一个Java程序DumpKafkaTopic打印结果;下面是DumpKafkaTopic的关键方法

我已经通过使用Kafka实用程序kafka-console-consumer.sh确认了我所期望的数据确实正在写入预期的主题然而,DumpKafkaTopic的行为很奇怪:当我运行producer,然后DumpKafkaTopic时,它会像我预期的那样打印结果。但是,如果我立即重新运行它,它将不会打印任何内容

认为因为我将auto.offset.reset设置为earliest,所以我的程序将是幂等的,也就是说,每次我运行它时,它都会产生相同的结果(直到我为主题编写其他内容)。为什么没有发生这种情况

public void dump( String kafka_topic ) {

    // Serializers/deserializers (serde) for key and value types
    final Serde<Long> long_serde = Serdes.Long();
    final Serde< TsdbObject > tsdb_object_serde = 
        Serdes.serdeFrom( new TsdbObject.TsdbObjectSerializer(), 
                          new TsdbObject.TsdbObjectDeserializer() );

    StreamsBuilder streams_builder = new StreamsBuilder();
    KStream< Long, TsdbObject > kstream = 
        streams_builder.stream( kafka_topic, Consumed.with( long_serde, tsdb_object_serde ) );

    // Add final operator, to print results to stdout:
    Printed< Long, TsdbObject > printed = Printed.toSysOut();
    kstream.print( printed );

    Map<String, Object> kstreams_props = new HashMap<>();
    kstreams_props.put(StreamsConfig.APPLICATION_ID_CONFIG, "DumpKafkaTopic");
    kstreams_props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    // make sure to consume the complete topic via "auto.offset.reset = earliest"
    kstreams_props.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    StreamsConfig kstreams_config = new StreamsConfig(kstreams_props);

    KafkaStreams kstreams = new KafkaStreams( streams_builder.build(), kstreams_config );
    System.out.println( "Starting DumpKafkaTopic stream " );

    kstreams.start();

    // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams (from https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/)
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println( "Stopping DumpKafkaTopic stream " );
                kstreams.close();
            }
        }));                
}

共 (0) 个答案