有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何实现或找到threadsafe CompletionService的等价物?

我在Tomcat容器中运行了一个简单的web服务,它本质上是多线程的。在进入服务的每个请求中,我希望对外部服务进行并发调用。java中的ExecutorCompletionService。util。我有一部分是在那里。我可以为它提供一个线程池,它将负责执行我的并发调用,并在任何结果就绪时通知我

处理特定传入请求的代码可能如下所示:

void handleRequest(Integer[] input) {
    // Submit tasks
    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(Executors.newCachedThreadPool());
    for (final Integer i : input) {
        completionService.submit(new Callable<Integer>() {
            public Integer call() {
                return -1 * i;
            }
        });
    }

    // Do other stuff...

    // Get task results
    try {
        for (int i = 0; i < input.size; i++) {
            Future<Integer> future = completionService.take();
            Integer result = future.get();
            // Do something with the result...
        }
    } catch (Exception e) {
        // Handle exception
    }
}

这应该可以很好地工作,但是效率很低,因为每个传入请求都分配了一个新的线程池。如果将CompletionService作为共享实例移出,我将遇到线程安全问题,多个请求共享同一CompletionService和线程池。当请求提交任务并得到结果时,它们得到的结果不是它们提交的结果

因此,我需要一个线程安全的CompletionService,它允许我在所有传入请求之间共享一个公共线程池。当每个线程完成一项任务时,应该通知传入请求的相应线程,以便它可以收集结果

实现这种功能最直接的方法是什么?我相信这种模式已经应用了很多次;我只是不确定这是否是Java并发库提供的,或者是否可以使用一些Java并发构建块轻松构建

更新:我忘记提到的一个警告是,我希望在完成任何提交的任务后尽快收到通知。这是使用CompletionService的主要优势,因为它将任务和结果的生产和消费解耦。实际上,我并不关心返回结果的顺序,我希望在等待结果按顺序返回时避免不必要的阻塞


共 (4) 个答案

  1. # 1 楼答案

    你为什么需要一个CompletionService

    每个线程都可以简单地提交或调用Callables的一个ExecutorService的“常规”共享实例。然后,每个线程都保留自己的私有Future引用

    而且,Executor及其子代在设计上是线程安全的。你真正想要的是,每个线程都可以创建自己的任务并检查它们的结果

    {}中的Javadoc非常优秀;它包括使用模式和示例。阅读ExecutorService和其他类型的文档,更好地了解如何使用它们

  2. # 2 楼答案

    您共享Executor,但不共享CompletionService

    我们有一个AsyncCompleter可以做到这一点,并处理所有簿记工作,允许您:

    Iterable<Callable<A>> jobs = jobs();
    Iterable<A> results async.invokeAll(jobs);
    

    results按返回和块的顺序迭代,直到结果可用

  3. # 3 楼答案

    爪哇。util。concurrent提供您所需的一切。如果我正确理解了你的问题,你有以下要求:

    您希望提交请求,并立即(在合理范围内)处理请求结果(响应)。我相信你已经看到了解决问题的方法:java。util。同时发生的完成服务

    该服务相当简单,它将执行器和阻塞队列结合起来,以处理可运行和/或可调用的任务。BlockingQueue用于保存已完成的任务,您可以让另一个线程等待,直到完成的任务在CompletionService对象上排队(这是通过调用take()完成的)

    如前所述,共享Executor,并根据请求创建CompletionService。这看起来像是一件昂贵的事情,但是请再次考虑CS只是与执行器和阻塞队列协作。由于您共享的是最昂贵的实例化对象,即执行器,我认为您会发现这是一个非常合理的成本

    然而。。。尽管如此,您似乎仍然有一个问题,这个问题似乎是请求处理与响应处理的分离。这可以通过创建一个单独的服务来实现,该服务专门处理所有请求或特定类型请求的响应

    下面是一个例子: (注意:这意味着请求对象实现是可调用接口,它应该返回一个响应类型……在这个简单的示例中,我省略了这些细节)

    class RequestHandler {
    
      RequestHandler(ExecutorService responseExecutor, ResponseHandler responseHandler) {      
        this.responseQueue = ...
        this.executor = ...
      }  
    
      public void acceptRequest(List<Request> requestList) {
    
        for(Request req : requestList) {
    
          Response response = executor.submit(req);
          responseHandler.handleResponse(response);
    
        }  
      }  
    }
    
    class ResponseHandler {
      ReentrantLock lock;
      ResponseHandler(ExecutorService responseExecutor) {
        ...
      }
    
      public void handleResponse(Response res) {
        lock.lock() {
        try {
          responseExecutor.submit( new ResponseWorker(res) );
        } finally {
          lock.unlock();
        }    
      }
    
      private static class ResponseWorker implements Runnable {
    
        ResponseWorker(Response response) {
          response = ...
        }
    
        void processResponse() {         
          // process this response 
        }
    
        public void run() {      
          processResponse();      
        }  
      }
    }
    

    有几件事需要记住:第一,ExecutorService执行阻塞队列中的可调用或可运行项;您的RequestHandler接收任务,这些任务在Executor上排队,并尽快处理。同样的事情也发生在你的负责人身上;收到响应后,该独立执行人将尽快处理该响应。简而言之,有两个执行器同时工作:一个在请求对象上,另一个在响应对象上

  4. # 4 楼答案

    你可以使用普通的共享服务。无论何时提交任务,您都将获得刚刚提交的任务的未来。您可以将它们全部存储在一个列表中,稍后再进行查询

    例如:

    private final ExecutorService service = ...//a single, shared instance
    
    void handleRequest(Integer[] input) {
        // Submit tasks
        List<Future<Integer>> futures = new ArrayList<Future<Integer>>(input.length);
        for (final Integer i : input) {
            Future<Integer> future = service.submit(new Callable<Integer>() {
                public Integer call() {
                    return -1 * i;
                }
            });
            futures.add(future);
        }
    
        // Do other stuff...
    
        // Get task results
        for(Future<Integer> f : futures){
            try {
                Integer result = f.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }