java从flux的开始捕获元素,并创建一个新的flux,其中包含捕获的元素和剩余的元素
我正在从事一个基于SpringCloudGateway的项目,我的目标是捕获和记录部分传入和传出的消息。请求日志记录必须在请求传递到后端服务之前完成,并且相同的策略应用于响应。实现应该基于过滤器。我无法控制网关何时订阅产生的流量
简而言之,我想做以下几点:
- 从flux捕获最多x字节的数据
- 记录捕获的数据
- 创建包含捕获数据和剩余数据的流量
到目前为止,我得到了这个-它似乎是工作。我只是想知道,我是否错过了一些东西和/或是否有更好的方法来实现这一点。我相信其他人也遇到过类似的问题:
Flux<Integer> body = Flux.range(1, 50).log(); // Simulate flow of data
ConnectableFlux<Integer> sharedBody = body.publish(1); // Content is already buffered - ideal prefetch would be 0
AtomicLong readCount = new AtomicLong(); // Counter
AtomicReference<Flux<Integer>> partiallyCachedFlux = new AtomicReference<>(); // A hack, not to be used in real world
Flux.from(sharedBody)
.takeUntil(s -> {
System.out.println("C:" + s);
return readCount.incrementAndGet() >= 10; // Store up to 10 elements
})
.collectList()
.subscribe(ints -> {
System.out.println("Collected:" + ints); // Log what we got
partiallyCachedFlux.set(
Flux.concat(Flux.fromIterable(ints).log(), sharedBody)
); // Create a flux that contains captured data and remaining data
});
sharedBody.connect();
Thread.sleep(1000); // Because I was lazy
partiallyCachedFlux.get()
.doOnEach(i -> { if (i.isOnNext()) System.out.println("P:" + i.get());})
.subscribe(); // Show that we have captured everything
# 1 楼答案
与
takeUntil
相反的是skipUntil
。你可以share
将原始通量分成两个通量,一个是takesUntil,另一个是skipsUntil。您的最终结果将是两种流量的Flux.merge
注意,当像这样外部化状态(
AtomicInteger
)时,如果整个Flux
被多次订阅,您将遇到问题。解决这个问题的方法是将所有内容包装到Flux.defer
中,以便在lambda中创建外部状态,从而特定于给定订阅