有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

在java中,有没有一种方法可以从一开始就使用Kafka流(而不是通过KafkaConsumer)读取消息?

我们正在创建一个POC来读取数据库CDC并将其推送到外部系统

  1. 每个源表CDC都以Avro格式发送到各自的主题(使用卡夫卡模式注册表和卡夫卡服务器)
  2. 我们正在编写java代码来使用avro模式中的消息,使用AvroSerde对其进行反序列化,并将它们连接起来,然后发送到不同的主题,以便外部系统可以使用它

不过,我们有一个限制,即不能向源表主题生成消息来发送/接收新内容/更改。所以,编写连接代码的唯一方法是在运行应用程序时,每次从每个源主题开始读取消息。(直到我们确信代码正在工作并可以再次开始接收实时数据)

在KafkaConsumer对象中,我们可以选择使用SeekToBegining方法强制从jave代码的开头读取,这很有效。然而,当我们尝试使用KStream对象流式传输主题并强制从头开始读取它时,没有选择。这里有什么替代方案

我们尝试使用kafka consumer Group reset topic将偏移量重置为--to Earlime,但这只将偏移量设置为最近的偏移量。当我们尝试用“0”手动重置偏移量时,使用--to offset参数,我们会得到低于警告的值,但不会设置为“0”。我的理解是,设置为0应该从一开始就阅读消息。如果我错了,请纠正我

“警告新偏移量(0)低于主题分区的最早偏移量”

下面是示例代码

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put("schema.registry.url", SCHEMA_REGISTRY_URL);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);

StreamsBuilder builder = new StreamsBuilder();
//nothing returned here, when some offset has already been set
KStream myStream = builder.stream("my-topic-in-avro-schema",ConsumedWith(myKeySerde,myValueSerde)); 

KafkaStreams streams = new KafkaStreams(builder.build(),properties);
streams.start();

共 (1) 个答案

  1. # 1 楼答案

    它将帮助同样面临同样问题的人。使用UUID将应用程序Id和组Id替换为某个唯一标识符。随机的。配置属性中的toString()。它应该从一开始就获取消息