有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Kafka流联接中的java RecordTooLargeException

我有一个KStream x KStream连接,它正在崩溃,出现以下异常

Exception in thread “my-clicks-and-recs-join-streams-4c903fb1-5938-4919-9c56-2c8043b86986-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_15, processor=KSTREAM-SOURCE-0000000001, topic=my_outgoing_recs_prod, partition=15, offset=9248896
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_15] exception caught when producing
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87)
    at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
    at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:59)
    at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:105)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:107)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:100)
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
    ... 3 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

我正在加入一个Click主题和一个Recommendation主题。Click对象非常小(小于1KB)^另一方面,{}可能很大,偶尔会超过1MB

我搜索了异常,发现(here)需要在producer配置中设置max.request.size

我不明白的是,制作人是从哪里进入画面的?上述topic=my_outgoing_recs_prod例外中的主题是建议主题,而不是最终加入的主题。流媒体应用程序不应该只是“消费”它吗

尽管如此,我还是尝试将属性设置为config.put("max.request.size", "31457280");,即30MB。我不希望推荐记录超过这个限制。不过,代码正在崩溃

我无法更改Kafka集群中的配置,但如果需要,我可以更改Kafka中相关主题的属性

有人能建议我还可以试试什么吗

如果没有任何效果,我愿意忽略这些超大的信息。然而,我不知道如何处理这个问题

我执行连接的代码如下所示

Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, JOINER_ID + "-" + System.getenv("HOSTNAME"));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
config.put("max.request.size", "314572800");
config.put("message.max.bytes", "314572800");
config.put("max.message.bytes", "314572800");


KStreamBuilder builder = new KStreamBuilder();

KStream<String, byte[]> clicksStream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Serdes.String(), Serdes.ByteArray(), clicksTopic);
KStream<String, byte[]> recsStream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Serdes.String(), Serdes.ByteArray(), recsTopic);

KStream<String, ClickRec> join = clicksStream.join(
        recsStream,
        (click, recs) -> new ClickRec(click, recs),
        JoinWindows.of(windowMillis).until(3*windowMillis));

join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);

KafkaStreams streams = new KafkaStreams(builder, config);
streams.cleanUp();
streams.start();

ClickRec是连接的对象(它远小于Recommendation对象,我不希望它大于几KBs)

我应该在上面的代码中放置一个try...catch来从这些偶尔过大的对象中恢复吗


共 (1) 个答案

  1. # 1 楼答案

    有多个不同级别的配置:

    1. 您有一个代理设置message.max.bytes(默认值为1000012)(cfhttp://kafka.apache.org/documentation/#brokerconfigs
    2. 有一个主题级配置max.message.bytes(默认值为1000012)(cfhttp://kafka.apache.org/documentation/#topicconfigs
    3. 生产者有max.request.size(默认值为1048576)(参见http://kafka.apache.org/documentation/#producerconfigs

    堆栈跟踪表明,您需要在代理或主题级别更改设置:

    Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

    也许你还需要增加制片人的设置

    你为什么首先需要这个:

    在执行KStream KStream连接时,连接操作符将建立状态(它必须缓冲来自两个流的记录,以便计算连接)。默认情况下,状态由卡夫卡主题支持。本地状态基本上是一个缓存,而卡夫卡主题是真相的来源。因此,您的所有记录都将写入Kafka Streams自动创建的“变更日志主题”