java复杂聚合
我有一个主题中的数据需要在多个级别进行统计,所有代码和文章都只提到单词统计示例
数据的一个例子是:
序列号:123 国家:美国 日期:2018年5月1日 州:纽约 城市:纽约 参观人数:5人
序列号:123 国家:美国 日期:2018年6月1日 州:纽约 城市:皇后区 参观者:10人
序列号:456 日期:2018年6月1日 国家:美国 州:纽约 城市:皇后区 访客:27
序列号:123 日期:2018年6月1日 国家:美国 州:纽约 城市:纽约 参观者:867
我已经做了过滤,但聚合? 抱歉,Java 8和&;混合,我更喜欢8,但要同时学习
KTable<String, CountryVisitorModel> countryStream1 = inStream
.filter((key, value) -> value.status.equalsIgnoreCase("TEST_DATA"))
.groupBy((key, value) -> value.serial)
.aggregate(
new Initializer<CountryVisitorModel>() {
public CountryVisitorModelapply() {
return new CountryVisitorModel();
}
},
new Aggregator<String, InputModel, CountryVisitorModel>() {
@Override
public CountryVisitorModelapply(String key, InputModel value, CountryVisitorModel aggregate) {
aggregate.serial = value.serial;
aggregate.country_name = value.country_name;
aggregate.city_name = value.city_name;
aggregate.country_count++;
aggregate.city_count++;
aggregate.ip_count++;
//
return aggregate;
}
},
Materialized.with(stringSerde, visitorSerde));
对于所有相同的序列号(这将是分组依据) 计算每个站点的访客总数:
国家/州/城市游客总数
# 1 楼答案
如果每个记录只贡献一个计数,我建议
branch()
流和每个子流的计数:如果分支不起作用,因为一条记录需要进行多次计数,您可以“广播”和过滤:
多次使用相同的
KStream
对象并应用多个运算符(在我们的例子中filter()
,每个记录都将“广播”给所有运算符)。请注意,在这种情况下,记录(即对象)不是物理复制的,而是使用相同的输入记录对象来调用每个filter()