有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

并行处理Java 8并行流findFirst

假设我们有一个这样的工人列表:

List<Worker> workers = new ArrayList<>();
workers.add(new Worker(1));
workers.add(new Worker(2));
workers.add(new Worker(3));
workers.add(new Worker(4));
workers.add(new Worker(5));

我想找到第一个完成工作的工人,所以:

Worker first = workers.parallelStream().filter(Worker::finish).findFirst().orElse(null);

但是有一个问题,我不想等待所有工人完成他们的工作,然后再找到第一个工人,而是第一个工人在他完成工作后就找到了

public class Test {

    public static void main(String[] args) {
        List<Worker> workers = new ArrayList<>();
        workers.add(new Worker(1));
        workers.add(new Worker(2));
        workers.add(new Worker(3));
        workers.add(new Worker(4));
        workers.add(new Worker(5));
        Worker first = workers.parallelStream().filter(Worker::finish).findFirst().orElse(null);
        if (first != null) {
            System.out.println("id : " + first.id);
        }
    }

    static class Worker {

        int id;

        Worker(int id) {
            this.id = id;
        }

        boolean finish() {
            int t = id * 1000;
            System.out.println(id + " -> " + t);
            try {
                Thread.sleep(t);
            } catch (InterruptedException ignored) {
            }
            return true;
        }

    }

}

有没有办法用java.util.Stream来实现它

谢谢


共 (4) 个答案

  1. # 1 楼答案

    你似乎对^{有严重的误解Stream并不意味着启动工人。事实上,如果你使用findFirst,它可能只启动第一个工人,而不启动任何工人。因此,它也不会等待“所有工作人员完成”,而是只等待当前挂起的线程。但是,由于您有一个相当小的流,因此可能所有的工作线程都已经启动,因为您的环境中有尽可能多的可用线程。但这并不是一个可以保证的行为

    请注意,如果使用顺序流而不是并行流,它肯定只处理第一项(因为它返回true),而不处理其他项。但由于流实现无法预测结果,它将尊重您通过并行执行“加速”操作的请求,并可能开始使用更多线程提前处理更多项

  2. # 2 楼答案

    我知道这是个老问题,但在这里找到了一些不错的解决方案: https://winterbe.com/posts/2015/04/07/java8-concurrency-tutorial-thread-executor-examples/

    在调用下:

    Another way of batch-submitting callables is the method invokeAny() which works slightly different to invokeAll(). Instead of returning future objects this method blocks until the first callable terminates and returns the result of that callable.

    将流更改为可调用的集合。看起来很干净

  3. # 3 楼答案

    当您使用finish方法作为流的过滤器时,这意味着为了计算特定工作程序的过滤器谓词,工作程序必须完成其工作

    但是,当您以并行流的形式运行此代码时,可能会同时对多个工作进程应用过滤器,在这种情况下,第一个完成的工作进程将为您提供输出。但是,您无法控制并行流将使用多少线程。它可能会决定一些辅助线程应该在同一个线程上处理,在这种情况下,其中一些根本不会被处理(因为终端操作要求只有一个辅助线程完成其处理)

    因此,如果您的目标是同时对所有worker执行finish,那么就不能使用流(甚至不能使用并行流)

  4. # 4 楼答案

    您可以尝试使用Java的反应式扩展(RxJava)中的Observable,而不是使用Stream。下面是示例代码

    public class Example {
      public static void main(String[] args) {
        Maybe<Worker> workerResult = Observable.fromArray(Worker.run(1), Worker.run(2), Worker.run(3), Worker.run(4), Worker.run(5))
            .flatMap(worker -> (Observable<Worker>) worker)
            .firstElement();
        workerResult.subscribe(onNext -> System.out.println("First worker [" + onNext.toString() + "]"));
    
        try {
          Thread.sleep(5000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
    
    class Worker {
      private int id;
      static Observable run(int id) { return Observable.just(new Worker(id)).observeOn(Schedulers.computation()).doOnNext(Worker::process); }
      private Worker(int id) { this.id = id; }
      public void process() {
        try {
          Thread.sleep(new Random().nextInt(2000));
        } catch (InterruptedException e) {
          System.out.println(String.format("[%s] Thread interrupted [%s]", Thread.currentThread(), id));
        }
        System.out.println(String.format("[%s] Worker [%s]", Thread.currentThread(), id));
      }
        public String toString() { return "Worker [" + id + "]"; }
    }
    

    示例输出:

    [Thread[RxComputationThreadPool-2,5,main]] Worker [2]
    [Thread[RxComputationThreadPool-1,5,main]] Thread interrupted [1]
    [Thread[RxComputationThreadPool-1,5,main]] Worker [1]
    [Thread[RxComputationThreadPool-4,5,main]] Thread interrupted [4]
    [Thread[RxComputationThreadPool-3,5,main]] Thread interrupted [3]
    [Thread[RxComputationThreadPool-3,5,main]] Worker [3]
    [Thread[RxComputationThreadPool-4,5,main]] Worker [4]
    First worker [Worker [2]]