有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

多线程使用错误管理在Java中运行多线程

我的工作界面如下:

public interface IJob {
    public void execute();
}

在我的应用程序中,我有多个类实现此接口,如IJob1和IJob2:

public class IJob1 implements IJob{
@Override
public void execute(){
    System.out.println("IJob1\n");
    }
}

public class IJob2 implements IJob{
@Override
public void execute(){
    System.out.println("IJob2\n");
    }
}

因为要运行的作业数量稳步增加,所以我想创建一个新作业,它获取IJob实例列表并并行运行它们。新实现用于并行运行作业的线程数量应该是可配置的。如果其中一个作业引发异常,则当前运行的所有其他作业也应停止,并且execute()方法应将异常传递给调用方

我写了这个,但我无法运行作业并检查是否有错误:

import java.util.LinkedList;

public class WorkQueue
{
    private final int nThreads;
    private final IJob[] threads;
    private final LinkedList queue;

    public WorkQueue(int nThreads)
    {
        this.nThreads = nThreads;
        queue = new LinkedList();
        threads = new IJob[nThreads];

        for (int i=0; i<nThreads; i++) {
            threads[i] = new IJob();
            threads[i].execute();
        }
    }

    public void execute(Runnable r) {
    synchronized(queue) {
        queue.addLast(r);
        queue.notify();
    }
}

private class PoolWorker extends Thread {
    public void run() {
        Runnable r;

        while (true) {
            synchronized(queue) {
                while (queue.isEmpty()) {
                    try
                    {
                        queue.wait();
                    }
                    catch (InterruptedException ignored)
                    {
                    }
                }

                r = (Runnable) queue.removeFirst();
            }

            // If we don't catch RuntimeException, 
            // the pool could leak threads
            try {
                r.run();
            }
            catch (RuntimeException e) {
                // You might want to log something here
            }
        }
    }
}
}

你能给我一些继续下去的帮助吗? 多谢各位


共 (2) 个答案

  1. # 1 楼答案

    我建议使用托管线程池。典型的模式是使用Java Executors来获得ExecutorService。ExecutorService通常由线程池和作业队列实现。在ExecutorService上,有像shutdownNow()这样的方法“尝试停止所有正在执行的任务”。这听起来像你想做的

    例如

    List<Callable<Result>> tasks = new ArrayList<>();
    for (final Object job: jobs) {
        final Callable<Result> task = new Callable<Result>() {
                @Override
                public Result call() throws Exception {
                    // Do job here
                    return result;
                }
            };
        tasks.add(task);
    }
    final numOfThreads = 20; 
    final ExecutorService executor = Executors.newFixedThreadPool(numOfThreads);
    try {
        executor.invokeAll(tasks);
    } finally {
        executor.shutdownNow();
    }
    
  2. # 2 楼答案

    I want to create a new job, which takes a list of IJob instances and runs them in parallel. The amount of threads that the new implementation is using to run the jobs in parallel should be configurable. If one of the jobs throws an exception, all other currently running jobs should be stopped as well and the execute() method should pass the exception to the caller.

    这就是使用parallelStream()将使您的生活更简单的地方。你能行

    list.parallelStream().
        .forEach(IJob::execute);
    

    您可能会发现根本不需要框架,可以将这些代码合并到调用者中。e、 g

    Map<String, T> map = dataSet.parallelStream()
                                .map(t -> t.transform1())
                                .filter(t -> t.isGood())
                                .collect(Collectors.groupingByConcurrent(t.getKey()));