有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Java反应器未完成不一致性

我肯定我错过了什么。我正在运行以下代码:

@Test
public void simpleCreation() throws Exception {
    Iterator<String> data = ImmutableList.of("1", "2", "3").iterator();
    Flux<String> stringFlux = Flux.create(emmiter -> {
        while ( data.hasNext() ) {
            emmiter.next(data.next());
        }
        emmiter.complete();
    });
    ConnectableFlux<String> connectableFlux = stringFlux.publish();

    connectableFlux.doOnComplete(() -> System.out.println("connectableFlux.doOnComplete"));
    stringFlux.doOnComplete(() -> System.out.println("stringFlux.doOnComplete"));

    CountDownLatch completeLatch = new CountDownLatch(1);
    Disposable disposable = connectableFlux.subscribe(s -> {
        System.out.println("subscribe: data: " + s);
    }, error -> { }, completeLatch::countDown);

    connectableFlux.connect();

    completeLatch.await();
    disposable.dispose();
}

并期望它打印“connectableFlux.doOnComplete”或“stringFlux.doOnComplete”或两者,但我看不到。从subscribe方法执行OnComplete回调没有问题,但是这两个方法都没有被调用,我也不太明白为什么

对我来说,这看起来有点不一致——在一个地方调用回调,而其他地方则被忽略。我可以观察到doOnNext的类似行为

如果有人能解释这背后的概念,我将不胜感激。我确信这不是bug,只是我在框架或总体概念上遗漏了一些东西


共 (2) 个答案

  1. # 1 楼答案

    我不能加上这个作为评论,所以很抱歉打断了一个老问题。我只是想分享一下官方的反应堆指南:

    B.2. I used an operator on my Flux but it doesn’t seem to apply. What gives? Make sure that the variable you .subscribe() to has been affected by the operators you think should have been applied to it.

    Reactor operators are decorators. They return a different instance that wraps the source sequence and add behavior. That is why the preferred way of using operators is to chain the calls.

    Compare the following two examples:

    without chaining (incorrect)

    Flux<String> flux = Flux.just("foo", "chain");
    flux.map(secret -> secret.replaceAll(".", "*")); 
    flux.subscribe(next -> System.out.println("Received: " + next));
    The mistake is here. The result isn’t attached to the flux variable.
    

    without chaining (correct)

    Flux<String> flux = Flux.just("foo", "chain");
    flux = flux.map(secret -> secret.replaceAll(".", "*"));
    flux.subscribe(next -> System.out.println("Received: " + next));
    

    This sample is even better (because it’s simpler):

    with chaining (best)

    Flux<String> secrets = Flux
      .just("foo", "chain")
      .map(secret -> secret.replaceAll(".", "*"))
      .subscribe(next -> System.out.println("Received: " + next));
    

    The first version will output:

    Received: foo
    Received: chain
    

    Whereas the two other versions will output the expected:

    Received: ***
    Received: *****
    

    https://projectreactor.io/docs/core/release/reference/#faq.chain

  2. # 2 楼答案

    此线路导致以下问题:

    connectableFlux.doOnComplete(() -> System.out.println("connectableFlux.doOnComplete"));
    

    调用doOnComplete()的结果被忽略。该方法返回要在其上调用subscribe()的Flux实例的新版本,但不会将逻辑添加到旧的connectableFlux实例

    试着这样做:

    Iterator<String> data = ImmutableList.of("1", "2", "3").iterator();
    Flux<String> stringFlux = Flux.create(emmiter -> {
        while (data.hasNext()) {
            emmiter.next(data.next());
        }
        emmiter.complete();
    });
    
    stringFlux.doOnComplete(() -> System.out.println("stringFlux.doOnComplete()"))
            .subscribe(s -> System.out.println("subscribe: data: " + s), error -> {})
            .dispose();
    
    stringFlux.publish().connect();