有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Spark结构化流媒体:当前批次落后

它的实现似乎非常简单,但似乎存在一些问题

此作业从kafka主题读取偏移量(ui事件数据),进行一些聚合并将其写入Aerospike数据库

在高流量的情况下,我开始看到作业运行正常但没有插入新数据的问题。查看日志,我看到以下警告消息:

Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 43491 milliseconds

有几次工作继续写数据,但我可以看到计数很低,这表明有一些数据丢失

代码如下:

Dataset<Row> stream = sparkSession.readStream()
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaBootstrapServersString)
          .option("subscribe", newTopic)
          .option("startingOffsets", "latest")
          .option("enable.auto.commit", false)
          .option("failOnDataLoss", false)
          .load();
StreamingQuery query = stream
        .writeStream()
        .option("startingOffsets", "earliest")
        .outputMode(OutputMode.Append())
        .foreach(sink)
        .trigger(Trigger.ProcessingTime(triggerInterval))
        .queryName(queryName)
        .start();

共 (1) 个答案

  1. # 1 楼答案

    您可能需要处理maxOffsetsPerTrigger来调整每个批次的总输入记录。否则,应用程序的延迟可能会在一个批次中带来更多记录,因此它会减慢下一个批次的速度,进而在接下来的批次中带来更多延迟

    有关卡夫卡配置的更多详细信息,请参阅下面的链接

    https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html