javaapacheflink如何映射和匹配一个带有主键的alternatekey到一个键控流
我想要一个更简单、更好、更优雅的方法来解决下面的问题。我还没有看到关于这个主题的任何文档,我确信我目前的方法有一些瓶颈,谢谢
我有一个将Json映射到POJO的流
DataStream<MYPOJO> stream = env.
addSource( <<kafkaSource>>).map(new EventToPOJO());
POJO的一些字段将有一个填充的主键,一些字段将有一个填充的备用键,一些字段将同时具有这两个键。我在Flink文档中找到的使用两个键的唯一示例是,对复合键使用keyselector,而对备用键则不使用keyselector
我目前的做法如下:
- 使用RichFlatMap函数将主键的所有元素收集到流中,Astream
- 使用RichFlatMap函数将备用密钥的所有元素收集到流中,b流
- 对同时具有主键和备用键的项目使用richFlatMap,CStream
- 使用主键上的Cstream连接Astream
- 将B流与备用键上的Cstream连接起来
- 最后,按主键输入
DataStream<MyPOJO> primaryKey = stream.flatMap(new RichFlatMapFunction<MyPOJO mypojo, MyPOJO mypojo>() {
@Override
public void flatMap(MyPOJO mypojo, Collector<MyPOJO> collector) throws Exception {
if(mypojo.PrimaryKey() != null){
collector.collect(MyPOJO);
}
}
});
DataStream<MyPOJO> alternateKey = stream.flatMap(new RichFlatMapFunction<MyPOJO mypojo, MyPOJO mypojo>() {
@Override
public void flatMap(MyPOJO mypojo, Collector<MyPOJO> collector) throws Exception {
if(mypojo.getAlternateKey() != null){
collector.collect(mypojo);
}
}
});
DataStream<MyPOJO> both = stream.flatMap(new RichFlatMapFunction<MyPOJO mypojo, MyPOJO mypojo>() {
@Override
public void flatMap(MyPOJO mypojo, Collector<MYPOJO> collector) throws Exception {
if(mypojo.getAlternateKey() != null && mypojo.getPrimaryKey() !=null ){
collector.collect(mypojo);
}
}
});
//Join them
both.join(alternateKey)
.where(MyPOJO::getAlternateKey)
.equalTo(MyPOJO::getAlternateKey)
.window(TumblingEventTimeWindows.of(Time.milliseconds(1)))
.apply (new JoinFunction<MyPOJO, MyPOJO, MyPOJO>(){
@Override
public StateObject join(MyPOJO Mypojo, MyPOJO mypojo2) throws Exception {
// Some Join logic to keep both states
return stateObject2;
}
});
:: repeat for primary key stream ...
// keyby at the end
both.keyBy(MyPOJO::getPrimaryKey)
我相信我也可以使用一个过滤函数来实现这3个流,但我不想在第一时间分裂成3个流,请不要,为了可读性,我已经简化了上面的内容,所以请不要介意你可能发现的任何语法错误
# 1 楼答案
您应该实现一个包含主&;辅助键。它需要有
equals()
和hashCode()
方法,当两个记录相等时,它们实现所需的逻辑(*)。看见 hashCode() and equals() method for custom classes in flink了解有关为什么必须这样做的更多详细信息添加一个
MyPOJO.getJoiningKey()
返回此自定义POJO然后只需基于
.where(r -> r.getJoiningKey()).equals(r -> r.getJoiningKey())
进行一次连接(*)我还是不确定你想要什么样的逻辑。例如,如果左侧初级和;次键不为空,右侧主键为空,但次键不为空,您想比较什么