有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java复杂聚合

我有一个主题中的数据需要在多个级别进行统计,所有代码和文章都只提到单词统计示例

数据的一个例子是:

序列号:123 国家:美国 日期:2018年5月1日 州:纽约 城市:纽约 参观人数:5人

序列号:123 国家:美国 日期:2018年6月1日 州:纽约 城市:皇后区 参观者:10人

序列号:456 日期:2018年6月1日 国家:美国 州:纽约 城市:皇后区 访客:27

序列号:123 日期:2018年6月1日 国家:美国 州:纽约 城市:纽约 参观者:867

我已经做了过滤,但聚合? 抱歉,Java 8和&;混合,我更喜欢8,但要同时学习

KTable<String, CountryVisitorModel> countryStream1 = inStream
    .filter((key, value) -> value.status.equalsIgnoreCase("TEST_DATA"))
    .groupBy((key, value) -> value.serial)
    .aggregate(
            new Initializer<CountryVisitorModel>() {

            public CountryVisitorModelapply() {
                return new CountryVisitorModel();
            }
        },
        new Aggregator<String, InputModel, CountryVisitorModel>() {

            @Override
            public CountryVisitorModelapply(String key, InputModel value, CountryVisitorModel aggregate) {

    aggregate.serial = value.serial;
    aggregate.country_name = value.country_name;
    aggregate.city_name = value.city_name;

    aggregate.country_count++;
    aggregate.city_count++;
    aggregate.ip_count++;

        //
    return aggregate;
       }
},
Materialized.with(stringSerde, visitorSerde));

对于所有相同的序列号(这将是分组依据) 计算每个站点的访客总数:

国家/州/城市游客总数


共 (1) 个答案

  1. # 1 楼答案

    如果每个记录只贡献一个计数,我建议branch()流和每个子流的计数:

    KStream stream = builder.stream(...)
    KStream[] subStreams = stream.branch(...);
    
    // each record of `stream` will be contained in exactly _one_ `substream`
    subStream[0].grouByKey().count(); // or aggregate() instead of count()
    subStream[1].grouByKey().count();
    // ...
    

    如果分支不起作用,因为一条记录需要进行多次计数,您可以“广播”和过滤:

    KStream stream = builder.stream(...)
    
    // each record in `stream` will be "duplicated" and sent to all `filters`
    stream.filter(...).grouByKey().count(); // or aggregate() instead of count()
    stream.filter(...).grouByKey().count();
    // ...
    

    多次使用相同的KStream对象并应用多个运算符(在我们的例子中filter(),每个记录都将“广播”给所有运算符)。请注意,在这种情况下,记录(即对象)不是物理复制的,而是使用相同的输入记录对象来调用每个filter()