有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java为什么Apache Kafka消费者不使用来自主题的消息?

我花了四天时间解决不了这个问题。消费者不会收到来自制作人放置在那里的卡夫卡主题的消息。下面是消费者配置

@EnableKafka
@Configuration
public class KafkaConfig {

    @Value("${fetch.max.bytes:1048576}")
    private String maxReceivedMessageSize;

    . . . . . . . . . . . . . .
    @Bean
    public ConsumerFactory<String, JsonNode> consumerFactory() {
        JsonDeserializer<JsonNode> jsonDeserializer = new JsonDeserializer<>(JsonNode.class);
        jsonDeserializer.addTrustedPackages(ClassUtils.getPackageName(ObjectNode.class));

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, groupId);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, jsonDeserializer);
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxReceivedMessageSize);
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, maxReceivedMessageSize);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), jsonDeserializer);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, JsonNode> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, JsonNode> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

下面是获取的值。应用程序中定义的最大字节数。属性文件:

fetch.max.bytes=20048588

以下是卡夫卡消费者类别的定义:

@Service
@RequiredArgsConstructor
public class KafkaConsumer {
    private final TopicService topicService;

    @KafkaListener(topics = {
            "configurationMessages",
            "metaDevicesMessages",
            "accidentsMessages",
            "sessionsMessages",
            "ticketsMessages"
    }, groupId = groupId)
    public void consume(JsonNode jsonNode) {
        topicService.consume(jsonNode);
    }
}

下面是topicService的定义。使用(jsonNode)方法:

@Service
@RequiredArgsConstructor
public class TopicService {
. . . . . . . . . . . . . . .
    public void consume(JsonNode incomingMessage) {
        String code = Utils.safeGetOrDefault(incomingMessage.get("code"), JsonNode::textValue, EMPTY);
        switch (code) {
            case "configurationMessage":
                consumeConfigurationMessage(incomingMessage);
                break;
            case "accidentChangedMessage":
                consumeAccidentChangedMessage(incomingMessage);
                break;
            case "allMetaDevicesMessage":
                consumeAllMetaDevicesMessage(incomingMessage);
                break;
            case "accidentsMessage":
                consumeAccidentsMessage(incomingMessage);
                break;
            case "allTicketsMessage":
                consumeAllTicketsMessage(incomingMessage);
                break;
            case "ticketChangedMessage":
                consumeTicketChangedMessage(incomingMessage);
                break;
            case "sessionMessage":
                consumeSessionMessage(incomingMessage);
                break;
            case "allSessionsMessage":
                consumeAllSessionsMessage(incomingMessage);
                break;
            case "sessionElementMessage":
                consumeSessionElementMessage(incomingMessage);
                break;
            case "allAccidentsMessage":
                consumeAllAccidentsMessage(incomingMessage);
                break;
            default: {
                String errorMessage = "Error in TopicService.consume() method: \"Wrong message code:\" " + code;
                IncomingMessageProcessingError errorInfo = new IncomingMessageProcessingError(errorMessage, incomingMessage);
                JsonNode failed = new ObjectMapper().convertValue(errorInfo, JsonNode.class);
                kafkaTemplate.send("messageErrors", failed);
            }
        }
    }

    // Below are methods for handling messages received from kafka, as well as helper methods.
    . . . . . . . . . . . . . . . . . . . . . . .
}

其中变量code是唯一标识消费者从主题中读取的消息的字符串。和

 org.apache.commons.lang3.StringUtils.EMPTY;

代码的默认值

TopicService是一项消费者服务,必须处理从卡夫卡主题接收的消息。 因此,应用程序中运行着两个主要服务,一个是将消息放入kafka的生产者服务,另一个是必须从主题中读取消息并进行相应处理的消费者服务--TopicService。 为什么TopicService消费者要么根本不接受来自主题的消息,要么很少接受它们,尽管服务制作人会在一定的时间间隔(我会跟踪这一点)不断地向卡夫卡主题发送消息。我做错了什么?非常感谢你的帮助


共 (0) 个答案