java为什么Apache Kafka消费者不使用来自主题的消息?
我花了四天时间解决不了这个问题。消费者不会收到来自制作人放置在那里的卡夫卡主题的消息。下面是消费者配置
@EnableKafka
@Configuration
public class KafkaConfig {
@Value("${fetch.max.bytes:1048576}")
private String maxReceivedMessageSize;
. . . . . . . . . . . . . .
@Bean
public ConsumerFactory<String, JsonNode> consumerFactory() {
JsonDeserializer<JsonNode> jsonDeserializer = new JsonDeserializer<>(JsonNode.class);
jsonDeserializer.addTrustedPackages(ClassUtils.getPackageName(ObjectNode.class));
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, groupId);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, jsonDeserializer);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxReceivedMessageSize);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, maxReceivedMessageSize);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), jsonDeserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, JsonNode> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, JsonNode> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
下面是获取的值。应用程序中定义的最大字节数。属性文件:
fetch.max.bytes=20048588
以下是卡夫卡消费者类别的定义:
@Service
@RequiredArgsConstructor
public class KafkaConsumer {
private final TopicService topicService;
@KafkaListener(topics = {
"configurationMessages",
"metaDevicesMessages",
"accidentsMessages",
"sessionsMessages",
"ticketsMessages"
}, groupId = groupId)
public void consume(JsonNode jsonNode) {
topicService.consume(jsonNode);
}
}
下面是topicService的定义。使用(jsonNode)方法:
@Service
@RequiredArgsConstructor
public class TopicService {
. . . . . . . . . . . . . . .
public void consume(JsonNode incomingMessage) {
String code = Utils.safeGetOrDefault(incomingMessage.get("code"), JsonNode::textValue, EMPTY);
switch (code) {
case "configurationMessage":
consumeConfigurationMessage(incomingMessage);
break;
case "accidentChangedMessage":
consumeAccidentChangedMessage(incomingMessage);
break;
case "allMetaDevicesMessage":
consumeAllMetaDevicesMessage(incomingMessage);
break;
case "accidentsMessage":
consumeAccidentsMessage(incomingMessage);
break;
case "allTicketsMessage":
consumeAllTicketsMessage(incomingMessage);
break;
case "ticketChangedMessage":
consumeTicketChangedMessage(incomingMessage);
break;
case "sessionMessage":
consumeSessionMessage(incomingMessage);
break;
case "allSessionsMessage":
consumeAllSessionsMessage(incomingMessage);
break;
case "sessionElementMessage":
consumeSessionElementMessage(incomingMessage);
break;
case "allAccidentsMessage":
consumeAllAccidentsMessage(incomingMessage);
break;
default: {
String errorMessage = "Error in TopicService.consume() method: \"Wrong message code:\" " + code;
IncomingMessageProcessingError errorInfo = new IncomingMessageProcessingError(errorMessage, incomingMessage);
JsonNode failed = new ObjectMapper().convertValue(errorInfo, JsonNode.class);
kafkaTemplate.send("messageErrors", failed);
}
}
}
// Below are methods for handling messages received from kafka, as well as helper methods.
. . . . . . . . . . . . . . . . . . . . . . .
}
其中变量code是唯一标识消费者从主题中读取的消息的字符串。和空
org.apache.commons.lang3.StringUtils.EMPTY;
是代码的默认值
TopicService是一项消费者服务,必须处理从卡夫卡主题接收的消息。 因此,应用程序中运行着两个主要服务,一个是将消息放入kafka的生产者服务,另一个是必须从主题中读取消息并进行相应处理的消费者服务--TopicService。 为什么TopicService消费者要么根本不接受来自主题的消息,要么很少接受它们,尽管服务制作人会在一定的时间间隔(我会跟踪这一点)不断地向卡夫卡主题发送消息。我做错了什么?非常感谢你的帮助
共 (0) 个答案