带有ProcessingTimeSessionWindow的java Apache Flink自定义触发器
我正试图根据两个标准来存储传入流中的对象
- 如果对象总数为N,则将其存储并发送到下游李>
- 如果自上一个N对象起的时间为>;=超时,将其存储并发送到下游李>
这两种功能在Flink中分别以CountTrigger
和ProcessingTimeSessionWindows
的形式提供
我试图将这两种功能结合起来,创建一个自定义触发器,并扩展ProcessingTimeSessionWindows
以使用该触发器。它会触发第二个条件,但不会触发第一个条件。由于流不是键控流,我不能使用ValueState来存储计数,所以我想知道我有什么替代方法
代码如下:
public class ProcessingTimeCountSessionWindow extends ProcessingTimeSessionWindows {
private static final long serialVersionUID = 786L;
private final int count;
private ProcessingTimeCountSessionWindow(int count, long timeout) {
super(timeout);
this.count = count;
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeCountTrigger.create(count);
}
/**
* Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
* elements to sessions based on the element timestamp.
*
* @param count Max count of elements in session i.e. the upper bound on count gap between sessions
* @param size The session timeout, i.e. the time gap between sessions
* @return The policy.
*/
public static ProcessingTimeCountSessionWindow withCountAndGap(int count, Time size) {
return new ProcessingTimeCountSessionWindow(count, size.toMilliseconds());
}
}
自定义触发器如下所示:
计数触发器使用ReducingState
,但我的流没有设置密钥,因此无法工作
public class ProcessingTimeCountTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 786L;
private final int maxCount;
private final ReducingStateDescriptor<Integer> countStateDesc =
new ReducingStateDescriptor<>("window-count", new ReduceFunctions.IntSum(), IntSerializer.INSTANCE);
private ProcessingTimeCountTrigger(int maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
ReducingState<Integer> count = ctx.getPartitionedState(countStateDesc);
count.add(1);
if (count.get() >= maxCount) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(countStateDesc).clear();
}
public static ProcessingTimeCountTrigger create(int maxCount) {
return new ProcessingTimeCountTrigger(maxCount);
}
@Override
public String toString() {
return "ProcessingTimeCountTrigger(" + maxCount + ")";
}
}
# 1 楼答案
AsadSalik的代码真的有效吗?默认的ProcessTimeTrigger还注册timer,以在到达最大时间戳时触发窗口。源代码如下
# 2 楼答案
我可以通过复制粘贴CountTrigger并覆盖以下内容来解决这个问题:
我也不需要扩展ProcessingTimeSessionWindow,因为我可以使用创建的自定义触发器。不幸的是,我们无法扩展CountTrigger,因为它是私有构造函数,否则这将是最好的解决方案
最后的代码是这样的:
如果我们有10个元素,或者我们上次看到一个元素已经10秒了,这会将带扣的数据发送到下游
自定义触发代码如下所示: