有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何等待JavaRx2 Flowable完成其所有任务?

我正在努力学习RxJava2库的基础知识,现在我陷入了以下困境:
我已经通过Flowable.generate(...)生成了myFlowable,现在我需要等待所有任务完成执行,然后才能继续
下面是显示问题的代码:

myFlowable.parallel()
            .runOn(Schedulers.computation())
            .map(val -> myCollection.add(val))
            .sequential()
            .subscribe(val -> {
                System.out.println("Thread from subscribe: " + Thread.currentThread().getName());
                System.out.println("Value from subscribe: " + val.toString());
            });
    System.out.println("Before sleep - Number of objects: " + myCollection.size());
    try {
        Thread.sleep(1000);
        System.out.println("After sleep - Number of objects: " + myCollection.size());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

我运行所有任务,并将结果添加到集合中。如果我在myFlowable块之后检查集合大小,那么它将不同,如果我在小Thread.sleep()之后检查它。有没有办法检查所有的任务都完成了,我们可以继续?任何帮助或指导都将不胜感激


共 (2) 个答案

  1. # 1 楼答案

    可以使用AtomicBoolean,将其初始化为false,然后使用doFinally()将其设置为true

    doFinally()在可观测信号onError或onCompleted或被下游处理后调用

    然后休眠主线程,直到completed值为真

    以你的例子:

    AtomicBoolean completed = new AtomicBoolean(false);
    
    myFlowable.parallel()
                .runOn(Schedulers.computation())
                .map(val -> myCollection.add(val))
                .sequential()
                .doFinally(() -> completed.set(true))
                .subscribe(val -> {
                    ...
                });
        ...
    try {
       while(!completed.get()){
           Thread.sleep(1000);
           ...
       }
      ...
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    
  2. # 2 楼答案

    由于RxJava是异步的,下面的java代码observable将运行,而observable将在另一个线程中运行。这就是为什么如果Flowable已经完成了数据的发送,那么您应该在RxJava流中得到通知。为此你有一个接线员。完美无缺 这里有一个如何检测流何时结束的示例

            Flowable.range(0, 100).parallel()
                .runOn(Schedulers.computation())
                .map(integer -> {
    
                    return integer;
                })
                .sequential()
                .doOnComplete(() -> {
                    System.out.println("finished");
                })
                .subscribe(integer -> System.out.println(integer));