java RxJava:将Observable与Completable合并不起作用
我有一个Observable,在某个时刻必须将内容写入缓存——我们希望在Observable上完成整个操作之前等待写入完成(出于报告目的)
出于测试目的,缓存写完成表如下所示:
Completable.create(
emitter ->
new Thread(
() -> {
try {
Thread.sleep(2000);
doSomething();
emitter.onComplete();
} catch (InterruptedException e) {
e.printStackTrace();
}
})
.start());
由于我有几个缓存写入,我尝试将它们合并到一个容器类中:
public class CacheInsertionResultsTracker {
private Completable cacheInsertResultsCompletable;
public CacheInsertionResultsTracker() {
this.cacheInsertResultsCompletable = Completable.complete();
}
public synchronized void add(Completable cacheInsertResult) {
this.cacheInsertResultsCompletable = this.cacheInsertResultsCompletable.mergeWith(cacheInsertResult);
}
public Completable getCompletable() {
return this.cacheInsertResultsCompletable;
}
}
我试着用下面的方式把它和可观察的结合起来:
CacheInsertionResultsTracker tracker = new ...;
observable
.doOnNext(next->tracker.add(next.writeToCache(...)))
.mergeWith(Completable.defer(()->tracker.getCompletable()))
.subscribe(
// on next
this::logNextElement
// on error
this::finishWithError
// on complete
this::finishWithSuccess
);
我怎样才能确保在finishWithSuccess被称为doSomething时完成? 问题是,每次我添加一个新引用时,Completable引用都会更新,而且它发生在mergeWith运行之后
# 1 楼答案
对于我们的用例来说,似乎有效的解决方案是使用concatWith+defer:
Concat确保对Completable的订阅仅在完成可观察对象之后发生,并且延迟获取最终Completable直到该订阅(因此所有对象都已添加到跟踪器)
# 2 楼答案
根据注释,您可以用
ReplaySubject<Completable>
替换completable cache,执行一些超时以检测不活动,并结束可观察序列编辑:
更新后的示例意味着您希望运行
Completable
以响应主observable
中的项目,并按照该顺序隔离,然后等待所有项目完成。这是flatMap
的典型用例: