有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java KafkaStreams SerializationException:创建视图时未知的魔法字节失败

我在创建包含AVRO记录的主题顶部的视图时遇到困难。尽管我相当确定在这个主题中只有AVRO值,但我一直得到以下例外:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_0, processor=KSTREAM-SOURCE-0000000006, topic=topictwo, partition=0, offset=0, stacktrace=org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) ~[kafka-streams-2.6.0.jar:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

该项目遵循的管道如下:有一个JDBC源连接器正在运行,它从数据库导入数据,并每20秒更新一次。连接器将数据发布到第一个主题topicone

然后,有一个JavaSpring组件处理topicone中的数据,并将经过优化的数据发布到另一个主题topictwo。下面是这样的处理器:

@Autowired
public void processRawValues(StreamsBuilder builder) {
    final Serde<RawValue> rawValueSpecificAvroSerde = new SpecificAvroSerde<>();
    final Serde<RefinedValue> refinedValueSpecificAvroSerde = new SpecificAvroSerde<>();
    Map<String, String> props = new HashMap<>();
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, configs.getSchemaRegistryURL());
    rawValueSpecificAvroSerde.configure(props, false);
    refinedValueSpecificAvroSerde.configure(props, false);
    KStream<String, RawValues> rawStream = builder.stream("topicone",
            Consumed.with(Serdes.String(), rawValueSpecificAvroSerde));
    rawStream.mapValues(helperClass::refineValues)
            .map((key, value) -> KeyValue.pair(value.getKey(), value))
            .to("topictwo", Produced.with(Serdes.String(), refinedValueSpecificAvroSerde));
}

最后,我尝试使用以下代码创建视图:

@Autowired
public void buildView(StreamsBuilder builder) {
    final Serde<RefinedValue> refinedValueSpecificAvroSerde = new SpecificAvroSerde<>();
    Map<String, String> props = new HashMap<>();
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, configs.getSchemaRegistryURL());
    refinedValueSpecificAvroSerde.configure(props, false);
    builder.table("topictwo",
        Consumed.with(Serdes.String(), refinedValueSpecificAvroSerde),
        Materialized.as("viewname"));
}

我应该提到的是,使用以下方式正确且没有任何问题:

  • 像这样使用卡夫卡avro控制台消费者:kafka-avro-console-consumer --bootstrap-server BOOTSTRAP --property schema.registry.url=SCHEMA_REGISTRY --topic topictwo --value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer --from-beginning
  • 使用Spring集成创建使用者:
    @KafkaListener(topics = LISTENED_TOPICS, groupId = GROUP_ID)
    public void consumeRawValues(ConsumerRecord<String, RefinedValue> record) {
        logger.info(record.value().toString());
    }
    

这就是为什么我非常确定,当这些值被发布到topictwo时,它们会被正确地序列化到AVRO记录中。有人能帮我理解我做错了什么吗


共 (0) 个答案