安卓 Java ExecutorService任务/可调用未取消/中断
我正在使用Java ExecutorService(线程池)执行一项任务&;在特定活动处于前台(可见)时更新UI
问题: 我想要的是当用户切换到另一个活动时,我想要停止/取消所有任务(无论是排队的还是正在运行的)。为此,我必须使用ExecutorService shutdown/shutdownNow方法,或者在调用isDone()检查未来对象状态后,对ExecutorService submit方法返回的未来对象取消(true)。这会将中断对应的线程标志设置为TRUE,我必须在可调用实现中检查该标志(thread.currentThread.isInterrupted()),以确定中断是否退出任务/线程。问题在于,在这两种情况下,我是调用ExecutorService shutdown方法还是调用Future cancel(true)方法,很少有10次,它会将线程中断标志设置为true,这最终会导致内存泄漏等
代码:
线程池单例实现(cancelAll以取消任务和shutdownExecutor以shutdown Executor服务):
private static class ThreadPoolManager {
private ExecutorService executorService;
private List<Future> queuedFutures;
private BlockingQueue<Runnable> blockingQueue;
private static ThreadPoolManager instance;
private ThreadPoolManager() {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
queuedFutures = new ArrayList<>();
blockingQueue = new LinkedBlockingDeque<>();
executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
}
static {
instance = new ThreadPoolManager();
}
public static void submitItemTest(Callable<Object> callable) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
if(instance.executorService.isShutdown()){
instance=new ThreadPoolManager();
}
Future future = instance.executorService.submit(callable);
instance.queuedFutures.add(future);
}
public static void submitTestAll(Callable<Object> callable) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
if(instance.executorService.isShutdown()){
instance=new ThreadPoolManager();
}
cancelAll();
Future future = instance.executorService.submit(callable);
instance.queuedFutures.add(future);
}
public static void cancelAll() {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
instance.blockingQueue.clear();
for (Future future : instance.queuedFutures) {
if (!future.isDone()) {
boolean cancelled = future.cancel(true);
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
}
}
instance.queuedFutures.clear();
}
public static void shutdownExecutor(){
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
instance.executorService.shutdownNow();
}
}
可调用实现(检查中断的正常迭代和if子句):
private Callable<Object> getTestAllCallable() {
return new Callable<Object>() {
@Override
public Object call() {
for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
if (!Thread.currentThread().isInterrupted()) {
//someWork
} else {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
return null;
}
}
return null;
}
};
}
活动/片段顶部实现(用于调用取消任务和关闭):
@Override
public void onStop() {
MyLogger.log(MyLogger.LOG_TYPE.INFO, "onStop called");
ThreadPoolManager.cancelAll();
ThreadPoolManager.shutdownExecutor();
super.onStop();
}
更新:
所作的修改:
从使用可运行改为可调用
现在不在ExecutorService中使用singleton
private class ThreadPoolManager { private ExecutorService executorService; private List<Future> queuedFutures; private BlockingQueue<Runnable> blockingQueue; private ThreadPoolManager() { MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)"); queuedFutures = new ArrayList<>(); blockingQueue = new LinkedBlockingDeque<>(); executorService =getNewExecutorService(); } private ExecutorService getNewExecutorService(){ return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue); } private void submitItemTest(Runnable runnable) { MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test"); if(executorService.isShutdown()){ executorService=getNewExecutorService(); } Future future = executorService.submit(runnable); queuedFutures.add(future); } private void submitTestAll(Runnable runnable) { MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all"); if(executorService.isShutdown()){ executorService=getNewExecutorService(); } cancelAll(); Future future = executorService.submit(runnable); queuedFutures.add(future); } private void cancelAll() { MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks"); blockingQueue.clear(); for (Future future : queuedFutures) { if (!future.isDone()) { boolean cancelled = future.cancel(true); MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled); } } queuedFutures.clear(); } private void shutdownExecutor(){ MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool"); executorService.shutdownNow(); blockingQueue.clear(); queuedFutures.clear(); } }
找到了罪魁祸首,但还没有找到解决办法。下面2个是可运行文件1的实现,其中1个正在工作(isInterrupted返回true或comes InterupptedException和than task ended),但不是其他
可运行工作(我使用它进行测试):
new Runnable() {
@Override
public void run() {
int i=0;
while(!Thread.currentThread().isInterrupted()){
try {
System.out.println(i);
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG,"Interrupted");
return;
}
i++;
}
}
}
不工作(我要使用的实际代码):
new Runnable(){
@Override
public void run() {
for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
if (!Thread.currentThread().isInterrupted()) {
} else {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Thread Interrupted (Cancelled)");
break;
}
}
}
};
和1个可能的解决方案是使用变量(布尔)作为可运行的中断标志,我将考虑作为最后的手段,但很乐意知道错误。p>
# 1 楼答案
根据{}文档,关闭正在执行的任务是在尽最大努力的基础上完成的
因此,当您调用
ExecutorService.shutdownNow()
时,实现将尝试关闭所有当前正在执行的任务。每个任务将一直运行,直到达到检测到中断的点为止为了确保线程在早期阶段达到该点,最好在循环中添加一个检查,检查线程是否被插入,如下所示:
通过在每次迭代中进行此调用,线程将在距离实际中断较短的时间间隔内检测到中断
因此,修改后的
Callable
代码如下所示:顺便说一下,如果您不打算从
call()
方法返回任何值,那么使用Callable
是没有意义的。如果您的任务中需要参数化类型,只需创建一个参数化的Runnable
,如下所示:# 2 楼答案
解决方案(出路): 因此,最后我继续使用自定义内部标志(布尔值)作为线程中断标志,MyRunnable将在每次迭代中检查该标志(使用自定义标志的runnable的自定义实现,以便与每个runnable关联一个标志)。当需要取消ExecutorService(ThreadPool)下的线程时,我迭代所有未来的对象,并将其关联到MyRunnable,然后将其中断标志(自定义标志)设置为true,这将中断/关闭线程
线程池管理器:
MyRunnable(实现Runnable的抽象类):
MyRunnable(抽象类MyRunnable的实例):
现在,打电话给threadPoolManager。shutdownExecutor()关闭/中断当前运行的所有线程