有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何在RxJava中立即获取订阅服务器中其他线程中的所有元素?

如果我执行这样的代码:

    StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    Flowable.fromIterable(Lists.newArrayList(1, 2, 3, 4, 5, 6))
            .subscribeOn(Schedulers.io())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    System.out.println(stopWatch + " value:" + integer);
                    Thread.sleep(1000);
                }
            });
    Thread.sleep(100000);

在输出中,我会在睡眠时间之后得到每个元素,如下所示:

00:00:00.027 value:1
00:00:01.030 value:2
00:00:02.030 value:3
00:00:03.031 value:4
00:00:04.031 value:5
00:00:05.031 value:6

但据我所知,如果我使用调度程序。io(),然后我必须并行地得到所有的值,我希望,我会立即得到所有的值,然后我会只等待1000个mills,仅此而已

像这样:

00:00:00.027 value:1
00:00:00.030 value:2
00:00:00.030 value:3
00:00:00.031 value:4
00:00:00.031 value:5
00:00:00.031 value:6

我怎样才能把它们都放在其他线程中,而不是一个接一个

我不想让他们彼此等待

我尝试了Schedulers.computation()和其他方法,但它们仍然一个接一个地到达

如何立即获得所有这些

附言。 谷歌上有一些文字可以更好地搜索。我是从浏览器历史中得到的。 rxjava如何订阅,rxjava如何同步发布所有元素,只有一个线程活动rxjava,可从多个线程中流动,不在多个线程中工作


共 (1) 个答案

  1. # 1 楼答案

    if i use Schedulers.io(), than i must get all the values parallel

    没有。默认情况下,RxJava流是连续的,这意味着项目是一个接一个地交付的。如果您的消费者阻塞或睡眠,后续项目每次都会延迟

    How can i get them all in other threads, NOT one by one?

    使用parallel

    StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    Flowable.fromIterable(Lists.newArrayList(1, 2, 3, 4, 5, 6))
            .parallel()
            .runOn(Schedulers.io())
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    System.out.println(stopWatch + " value:" + integer);
                    Thread.sleep(1000);
                }
            })
            .sequential()
            .subscribe();
    
    Thread.sleep(100000);
    

    推荐阅读:https://github.com/ReactiveX/RxJava#parallel-processing