使用公共executor服务实例进行多线程Java并发处理
我有n个工作线程从一个kinesis流中检索记录(这对于这个问题并不重要),然后将这些记录推送到一个executor服务,在那里记录被处理并持久化到后端数据库。此同一执行器服务实例用于所有工作线程
现在有一个场景,任何给定的worker循环停止处理记录和块,直到它提交的所有记录都被完全处理。这本质上意味着,对于来自特定工作线程的记录,executor服务中不应存在挂起/正在运行的线程
实现的一个非常简单的示例如下:
工人阶级
public class Worker { Worker(Listener listener){ this.listener = listener; } //called periodically to fetch records from a kinesis stream public void processRecords(Record records) { for (Record record : records) { listener.handleRecord(record); } //if 15 minutes has elapsed, run below code. This is blocking. listener.blockTillAllRecordsAreProcessed() }
}
侦听器类
public class Listener { ExecutorService es; // same executor service is shared across all listeners. Listener(ExecutorService es){ this.es = es; } public void handleRecord(Record record) { //submit record to es and return // non blocking } public boolean blockTillAllRecordsAreProcessed(){ // this should block until all records are processed // no clue how to implement this with a common es } }
我能想到的唯一方法是为每个工作人员提供一个本地执行器服务,并为每个批处理执行类似invokeAll
的操作,这将稍微更改实现,但完成工作。但我觉得应该有更好的方法来解决这个问题
# 1 楼答案
您可以使用CountdownLatch类来阻止,如下所示:
阅读更多信息:https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html