有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

基于时间戳的java检索卡夫卡主题中的偏移量

我曾尝试根据TimesMap检索偏移量,但当我运行代码时,它在此处抛出空指针错误“Long seekOffset=outoffset.get(partition.offset();”因为特定时间没有偏移量,所以我的问题是如何在特定时间得到最近的偏移量

下面是我试过的代码


Long startTimestamp=Instant.now().minus (10, ChronoUnit.MINUTES ).toEpochMilli();

Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
              for (TopicPartition partition : partitions) {
                timestampsToSearch.put(partition,  startTimestamp);
              }
Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch);
              for (TopicPartition partition : partitions) {
                Long seekOffset = outOffsets.get(partition).offset();
consumer.seek(partition, seekOffset); 

任何帮助都将不胜感激!!提前谢谢


共 (1) 个答案

  1. # 1 楼答案

    方法offsetsForTimes返回时间戳大于或等于目标时间戳的第一条消息的偏移量。如果为空,则不存在此类消息。在这种情况下,只需将消费者放在末端即可

    for (TopicPartition partition : partitions) {
              OffsetAndTimestamp seekOffset = outOffsets.get(partition);
              if(seekOffset!=null){
                 consumer.seek(partition, seekOffset.offset()); 
              }else{
                 consumer.seekToEnd(Collections.singleton(partition));
              }
    }