有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java RxJava:将Observable与Completable合并不起作用

我有一个Observable,在某个时刻必须将内容写入缓存——我们希望在Observable上完成整个操作之前等待写入完成(出于报告目的)

出于测试目的,缓存写完成表如下所示:

   Completable.create(
                  emitter ->
                      new Thread(
                              () -> {
                                try {
                                  Thread.sleep(2000);
                                  doSomething();
                                  emitter.onComplete();
                                } catch (InterruptedException e) {
                                  e.printStackTrace();
                                }
                              })
                          .start());

由于我有几个缓存写入,我尝试将它们合并到一个容器类中:

public class CacheInsertionResultsTracker {

  private Completable cacheInsertResultsCompletable;

  public CacheInsertionResultsTracker() {
    this.cacheInsertResultsCompletable = Completable.complete();
  }

  public synchronized void add(Completable cacheInsertResult) {
    this.cacheInsertResultsCompletable = this.cacheInsertResultsCompletable.mergeWith(cacheInsertResult);
  }

  public Completable getCompletable() {
    return this.cacheInsertResultsCompletable;
  }
}

我试着用下面的方式把它和可观察的结合起来:

CacheInsertionResultsTracker tracker = new ...;
    observable
        .doOnNext(next->tracker.add(next.writeToCache(...)))
        .mergeWith(Completable.defer(()->tracker.getCompletable()))
        .subscribe(
            // on next
            this::logNextElement
            // on error
            this::finishWithError
            // on complete
            this::finishWithSuccess
            );

我怎样才能确保在finishWithSuccess被称为doSomething时完成? 问题是,每次我添加一个新引用时,Completable引用都会更新,而且它发生在mergeWith运行之后


共 (2) 个答案

  1. # 1 楼答案

    对于我们的用例来说,似乎有效的解决方案是使用concatWith+defer:

    observable
        .doOnNext(next->tracker.add(next.writeToCache(...)))
        .concatWith(Completable.defer(()->tracker.getCompletable()))
        .subscribe(
            // on next
            this::logNextElement
            // on error
            this::finishWithError
            // on complete
            this::finishWithSuccess
            );
    

    Concat确保对Completable的订阅仅在完成可观察对象之后发生,并且延迟获取最终Completable直到该订阅(因此所有对象都已添加到跟踪器)

  2. # 2 楼答案

    根据注释,您可以用ReplaySubject<Completable>替换completable cache,执行一些超时以检测不活动,并结束可观察序列

     ReplaySubject<Completable> cache = ReplaySubject.create();
    
     cache.onNext(completable);
    
     observable.mergeWith(
         cache.flatMapCompletable(v -> v)
              .timeout(10, TimeUnit.MILLISECONDS, Completable.complete())
     )
    

    编辑:

    更新后的示例意味着您希望运行Completable以响应主observable中的项目,并按照该顺序隔离,然后等待所有项目完成。这是flatMap的典型用例:

      observable.flatMap(
           next -> next.writeToCache(...).andThen(Observable.just(next))
      )
      .subscribe(
            this::logNextElement
            // on error
            this::finishWithError
            // on complete
            this::finishWithSuccess
       );