java KafkaStreams SerializationException:创建视图时未知的魔法字节失败
我在创建包含AVRO记录的主题顶部的视图时遇到困难。尽管我相当确定在这个主题中只有AVRO值,但我一直得到以下例外:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_0, processor=KSTREAM-SOURCE-0000000006, topic=topictwo, partition=0, offset=0, stacktrace=org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) ~[kafka-streams-2.6.0.jar:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
该项目遵循的管道如下:有一个JDBC源连接器正在运行,它从数据库导入数据,并每20秒更新一次。连接器将数据发布到第一个主题topicone
然后,有一个JavaSpring组件处理topicone
中的数据,并将经过优化的数据发布到另一个主题topictwo
。下面是这样的处理器:
@Autowired
public void processRawValues(StreamsBuilder builder) {
final Serde<RawValue> rawValueSpecificAvroSerde = new SpecificAvroSerde<>();
final Serde<RefinedValue> refinedValueSpecificAvroSerde = new SpecificAvroSerde<>();
Map<String, String> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, configs.getSchemaRegistryURL());
rawValueSpecificAvroSerde.configure(props, false);
refinedValueSpecificAvroSerde.configure(props, false);
KStream<String, RawValues> rawStream = builder.stream("topicone",
Consumed.with(Serdes.String(), rawValueSpecificAvroSerde));
rawStream.mapValues(helperClass::refineValues)
.map((key, value) -> KeyValue.pair(value.getKey(), value))
.to("topictwo", Produced.with(Serdes.String(), refinedValueSpecificAvroSerde));
}
最后,我尝试使用以下代码创建视图:
@Autowired
public void buildView(StreamsBuilder builder) {
final Serde<RefinedValue> refinedValueSpecificAvroSerde = new SpecificAvroSerde<>();
Map<String, String> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, configs.getSchemaRegistryURL());
refinedValueSpecificAvroSerde.configure(props, false);
builder.table("topictwo",
Consumed.with(Serdes.String(), refinedValueSpecificAvroSerde),
Materialized.as("viewname"));
}
我应该提到的是,使用以下方式正确且没有任何问题:
- 像这样使用卡夫卡avro控制台消费者:
kafka-avro-console-consumer --bootstrap-server BOOTSTRAP --property schema.registry.url=SCHEMA_REGISTRY --topic topictwo --value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer --from-beginning
- 使用Spring集成创建使用者:
@KafkaListener(topics = LISTENED_TOPICS, groupId = GROUP_ID) public void consumeRawValues(ConsumerRecord<String, RefinedValue> record) { logger.info(record.value().toString()); }
这就是为什么我非常确定,当这些值被发布到topictwo
时,它们会被正确地序列化到AVRO记录中。有人能帮我理解我做错了什么吗
共 (0) 个答案