有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

使用公共executor服务实例进行多线程Java并发处理

我有n个工作线程从一个kinesis流中检索记录(这对于这个问题并不重要),然后将这些记录推送到一个executor服务,在那里记录被处理并持久化到后端数据库。此同一执行器服务实例用于所有工作线程

现在有一个场景,任何给定的worker循环停止处理记录和块,直到它提交的所有记录都被完全处理。这本质上意味着,对于来自特定工作线程的记录,executor服务中不应存在挂起/正在运行的线程

实现的一个非常简单的示例如下:

  1. 工人阶级

    public class Worker {
    
    Worker(Listener listener){
        this.listener = listener;
    }
    
    //called periodically to fetch records from a kinesis stream
    public void processRecords(Record records) {
    
        for (Record record : records) {
            listener.handleRecord(record);
        }
    
        //if 15 minutes has elapsed, run below code. This is blocking.
        listener.blockTillAllRecordsAreProcessed()
    }
    

    }

  2. 侦听器类

    public class Listener {
    
        ExecutorService es;
    
        // same executor service is shared across all listeners.
        Listener(ExecutorService es){
            this.es = es;
        }
    
        public void handleRecord(Record record) {
            //submit record to es and return
            // non blocking
        }
    
        public boolean blockTillAllRecordsAreProcessed(){
            // this should block until all records are processed
            // no clue how to implement this with a common es
        }
    
    }
    

我能想到的唯一方法是为每个工作人员提供一个本地执行器服务,并为每个批处理执行类似invokeAll的操作,这将稍微更改实现,但完成工作。但我觉得应该有更好的方法来解决这个问题


共 (1) 个答案

  1. # 1 楼答案

    您可以使用CountdownLatch类来阻止,如下所示:

    public void processRecords(List<Record> records) {
     CountDownLatch latch = new CountDownLatch(records.size());
     for (Record record : records) {
         listener.handleRecord(record, latch);
     }
    
     //if 15 minutes has elapsed, run below code. This is blocking.
     listener.blockTillAllRecordsAreProcessed(latch)
     } 
    
    public class Listener {
     ExecutorService es;
     ...
     public void handleRecord(Record record, CountDownLatch latch) {
         //submit record to es and return
         // non blocking
         es.submit(()->{
            someSyncTask(record);
            latch.countDown();
            
        })
     }
    
     public boolean blockTillAllRecordsAreProcessed(CountDownLatch latch){
         System.out.println("waiting for processes to complete....");
         try {
              //current thread will get notified if all chidren's are done 
              // and thread will resume from wait() mode.
              latch.await();
              } catch (InterruptedException e) {
                   e.printStackTrace();
              }
           }
    
       }
    
    

    阅读更多信息:https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html