Java反应器未完成不一致性
我肯定我错过了什么。我正在运行以下代码:
@Test
public void simpleCreation() throws Exception {
Iterator<String> data = ImmutableList.of("1", "2", "3").iterator();
Flux<String> stringFlux = Flux.create(emmiter -> {
while ( data.hasNext() ) {
emmiter.next(data.next());
}
emmiter.complete();
});
ConnectableFlux<String> connectableFlux = stringFlux.publish();
connectableFlux.doOnComplete(() -> System.out.println("connectableFlux.doOnComplete"));
stringFlux.doOnComplete(() -> System.out.println("stringFlux.doOnComplete"));
CountDownLatch completeLatch = new CountDownLatch(1);
Disposable disposable = connectableFlux.subscribe(s -> {
System.out.println("subscribe: data: " + s);
}, error -> { }, completeLatch::countDown);
connectableFlux.connect();
completeLatch.await();
disposable.dispose();
}
并期望它打印“connectableFlux.doOnComplete”或“stringFlux.doOnComplete”或两者,但我看不到。从subscribe方法执行OnComplete回调没有问题,但是这两个方法都没有被调用,我也不太明白为什么
对我来说,这看起来有点不一致——在一个地方调用回调,而其他地方则被忽略。我可以观察到doOnNext的类似行为
如果有人能解释这背后的概念,我将不胜感激。我确信这不是bug,只是我在框架或总体概念上遗漏了一些东西
# 1 楼答案
我不能加上这个作为评论,所以很抱歉打断了一个老问题。我只是想分享一下官方的反应堆指南:
https://projectreactor.io/docs/core/release/reference/#faq.chain
# 2 楼答案
此线路导致以下问题:
调用
doOnComplete()
的结果被忽略。该方法返回要在其上调用subscribe()
的Flux实例的新版本,但不会将逻辑添加到旧的connectableFlux
实例试着这样做: