有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

消息中心上的java Kafka Streams KTable配置错误

此问题现在已在消息中心解决

我在卡夫卡中创建KTable时遇到了一些问题。我刚接触卡夫卡,这可能是我问题的根源,但我想我还是可以在这里问一下。我有一个项目,我想通过计算不同ID的总发生率来跟踪它们。我正在使用ibmcloud上的Message Hub来管理我的主题,到目前为止它工作得非常出色

我有一个关于MessageHub的主题,它会生成像{"ID":"123","TIMESTAMP":"1525339553", "BALANCE":"100", "AMOUNT":"4"}这样的消息,目前,唯一相关的键是ID

我的卡夫卡代码以及Streams配置如下所示:

import org.apache.kafka.streams.StreamsConfig;

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");    
props.put("security.protocol","SASL_SSL");
props.put("sasl.mechanism","PLAIN");
props.put("ssl.protocol","TLSv1.2");
props.put("ssl.enabled.protocols","TLSv1.2");
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";
saslJaasConfig = saslJaasConfig.replace("USERNAME", user).replace("PASSWORD", password);
props.put("sasl.jaas.config",saslJaasConfig);

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> Kstreams = builder.stream(myTopic);

KTable<String, Long> eventCount = Kstreams
        .flatMapValues(value -> getID(value)) //function that retrieves the ID
        .groupBy((key, value) -> value)
        .count();

运行代码时,出现以下错误:

Exception in thread "KTableTest-e2062d11-0b30-4ed0-82b0-00d83dcd9366->StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create topic KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition.

其次是:

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Invalid configuration: {segment.index.bytes=52428800, segment.bytes=52428800, cleanup.policy=delete, segment.ms=600000}. Only allowed configs: [retention.ms, cleanup.policy]

我不知道为什么会发生这种错误,以及如何处理它。我构建KStream和KTable的方式是否有点不正确?或者bluemix上的消息中心

已解决:

在我标记为正确的答案下面添加评论摘录。事实证明,我的StreamsConfig很好,而且(目前)Message Hub方面存在一个问题,但有一个解决方法:

事实证明,Message Hub在使用Kafka Streams 1.1创建重新分区主题时存在问题。当我们进行修复时,您需要手动创建主题KTableTest-KSTREAM-AGGREGATE-STATE-STORE-000000000 3-repartition。它需要与您的输入主题(myTopic)一样多的分区,并将保留时间设置为最大值。修复后,我将发布另一条评论

非常感谢你的帮助


共 (1) 个答案

  1. # 1 楼答案

    消息中心在创建主题时可以使用的配置上有一些restrictions

    从您收到的PolicyViolationException来看,您的Streams应用程序似乎试图使用一些我们不允许的配置:

    • 部分。指数字节
    • 部分。字节
    • 部分。ms

    我猜您在Streams配置中的某个位置设置了它们,应该将它们删除

    请注意,您还需要在配置中将StreamsConfig.REPLICATION_FACTOR_CONFIG设置为3,以使用消息中心,如我们的docs中所述