有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java使用单个json,但发布多个AVRO消息

我不熟悉春天的卡夫卡河。我最近设置了一个项目,并尝试使用kafka stream API发布AVRO。 要求: 需要解析可能包含多个子JSON的复杂Jsonobject。解析json并为每个子json发送AVRO消息。它也可以是一个或多个

我知道如何使用kafka客户端API实现这一点。这很简单,可以用循环完成,但用流API,我是新手

流类的当前代码如下所示:

@Service
@Log4j2
@EnableBinding(StreamBindings.class)
public class StreamListener {

    @Autowired
    Service service; 
    
    @StreamListener("Processor-input-channel")
    @SendTo("Processor-output-channel")
    public KStream<String, AVROClass> process(KStream<String, String> input){
        //parse input , map to fault and change received key to time stamp and send 
        KStream<String, AVROClass> kStream = input
                .mapValues(v -> service.getAVROResponse(v))
                .map((k,v)->KeyValue.pair((Long.toString((System.currentTimeMillis()))), v));

        kStream.foreach((k, v) -> log.info(String.format("Key: %s, Value: %s", k, v)));
        
        
        return kStream;
        
    }

我一次生成一个AVRO,但我需要知道如何生成多个AVRO,并在卡夫卡中将其作为单独的消息发送到输出主题。因为我从getAVROResponse返回的任何内容都将通过流自动发送


共 (2) 个答案

  1. # 1 楼答案

    要使用JSON,需要使用String/JSON-Serde,而不是AvroClass(假设这实际上是一个Avro SpecificRecord子类)

    否则,您似乎缺少Avro Serde,Consumed.with(<<avroSerde>>)),或者在StreamsConfig中将其设置为默认Serde

    如果要根据某些条件(嵌套类型)将传入消息分离到几个不同的主题,请使用branch

    如果要将一条消息转换为多条消息,然后转换为某个输出主题,请使用flatMapValues().to()

    要生成Avro,请确保在.to()输出中使用Produced.with()

  2. # 2 楼答案

    您应该能够使用Kafka Streams和Spring Cloud Stream完成这个用例,如下所示。请注意,不再建议使用StreamListener,建议使用函数方法。见下文

    这只是一个psuedo代码,请根据需要进行更新

    @SpringBootApplication
    public class MyApplication {
    
        @Bean
        public Function<KStream<String, String>, KStream<String, AvroClass>> process() {
            return input -> input.mapValues(v -> service.getAVROResponse(v))
                    .map((k,v)->KeyValue.pair((Long.toString((System.currentTimeMillis()))), v));
        }
    
        @Bean
        public Serde<AvroClass> avroInSerde(){
            final SpecificAvroSerde<AvroClass> avroSerde = new SpecificAvroSerde<>();
            avroSerde.configure(...)
            return avroInSerde;
        }
    
    }
    

    在配置中,您需要根据您的设置执行此操作。请适当更新架构注册表URL

    spring:
      cloud:
        stream:
          kafka:
            streams:
              binder:
                configuration:
                  schema.registry.url: http://localhost:8081
                  specific.avro.reader: true
    

    如果您不想像上面那样将Avro-serde指定为Springbean,那么可以按照下面的方式进行配置

    spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
    

    然而,我认为将avroserde指定为springbean可能是一种更干净的方法,因为springcloudstreambinder可以将函数签名的类型与serdebean的类型进行匹配。例如,在这种情况下,绑定器将对函数进行内省,并将其输出签名KStream<String, AvroClass>与Serde bean上定义的内容Serde<AvroClass>相匹配,并将其指定为出站上的值Serde