有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

安卓 Java ExecutorService任务/可调用未取消/中断

我正在使用Java ExecutorService(线程池)执行一项任务&;在特定活动处于前台(可见)时更新UI

问题: 我想要的是当用户切换到另一个活动时,我想要停止/取消所有任务(无论是排队的还是正在运行的)。为此,我必须使用ExecutorService shutdown/shutdownNow方法,或者在调用isDone()检查未来对象状态后,对ExecutorService submit方法返回的未来对象取消(true)。这会将中断对应的线程标志设置为TRUE,我必须在可调用实现中检查该标志(thread.currentThread.isInterrupted()),以确定中断是否退出任务/线程。问题在于,在这两种情况下,我是调用ExecutorService shutdown方法还是调用Future cancel(true)方法,很少有10次,它会将线程中断标志设置为true,这最终会导致内存泄漏等

代码:

线程池单例实现(cancelAll以取消任务和shutdownExecutor以shutdown Executor服务):

private static class ThreadPoolManager {

    private ExecutorService executorService;
    private List<Future> queuedFutures;
    private BlockingQueue<Runnable> blockingQueue;

    private static ThreadPoolManager instance;

    private ThreadPoolManager() {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
        queuedFutures = new ArrayList<>();
        blockingQueue = new LinkedBlockingDeque<>();
        executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
    }

    static {
        instance = new ThreadPoolManager();
    }

    public static void submitItemTest(Callable<Object> callable) {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
        if(instance.executorService.isShutdown()){
            instance=new ThreadPoolManager();
        }
        Future future = instance.executorService.submit(callable);
        instance.queuedFutures.add(future);
    }

    public static void submitTestAll(Callable<Object> callable) {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
        if(instance.executorService.isShutdown()){
            instance=new ThreadPoolManager();
        }
        cancelAll();
        Future future = instance.executorService.submit(callable);
        instance.queuedFutures.add(future);
    }

    public static void cancelAll() {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
        instance.blockingQueue.clear();
        for (Future future : instance.queuedFutures) {
            if (!future.isDone()) {
                boolean cancelled = future.cancel(true);
                MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
            }
        }
        instance.queuedFutures.clear();
    }

    public static void shutdownExecutor(){
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
        instance.executorService.shutdownNow();
    }
}

可调用实现(检查中断的正常迭代和if子句):

private Callable<Object> getTestAllCallable() {
        return new Callable<Object>() {
            @Override
            public Object call() {
                for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
                    if (!Thread.currentThread().isInterrupted()) {
                          //someWork

                    } else {
                        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
                        return null;
                    }
                }
                return null;
            }
        };
    }

活动/片段顶部实现(用于调用取消任务和关闭):

@Override
public void onStop() {
    MyLogger.log(MyLogger.LOG_TYPE.INFO, "onStop called");
    ThreadPoolManager.cancelAll();
    ThreadPoolManager.shutdownExecutor();
    super.onStop();
}

更新:

所作的修改:

  1. 从使用可运行改为可调用

  2. 现在不在ExecutorService中使用singleton

      private class ThreadPoolManager {
    
        private ExecutorService executorService;
        private List<Future> queuedFutures;
        private BlockingQueue<Runnable> blockingQueue;
    
        private ThreadPoolManager() {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
            queuedFutures = new ArrayList<>();
            blockingQueue = new LinkedBlockingDeque<>();
            executorService =getNewExecutorService();
        }
    
        private ExecutorService getNewExecutorService(){
            return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
        }
    
        private void submitItemTest(Runnable runnable) {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
            if(executorService.isShutdown()){
                executorService=getNewExecutorService();
            }
            Future future = executorService.submit(runnable);
            queuedFutures.add(future);
        }
    
        private void submitTestAll(Runnable runnable) {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
            if(executorService.isShutdown()){
                executorService=getNewExecutorService();
            }
            cancelAll();
            Future future = executorService.submit(runnable);
            queuedFutures.add(future);
        }
    
        private void cancelAll() {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
            blockingQueue.clear();
            for (Future future : queuedFutures) {
                if (!future.isDone()) {
                    boolean cancelled = future.cancel(true);
                    MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
                }
            }
            queuedFutures.clear();
        }
    
        private void shutdownExecutor(){
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
            executorService.shutdownNow();
            blockingQueue.clear();
            queuedFutures.clear();
        }
    }
    

找到了罪魁祸首,但还没有找到解决办法。下面2个是可运行文件1的实现,其中1个正在工作(isInterrupted返回true或comes InterupptedException和than task ended),但不是其他

可运行工作(我使用它进行测试):

new Runnable() {
          @Override
          public void run() {
                    int i=0;
                    while(!Thread.currentThread().isInterrupted()){
                        try {
                            System.out.println(i);
                            Thread.currentThread().sleep(2000);
                        } catch (InterruptedException e) {
                            MyLogger.log(MyLogger.LOG_TYPE.DEBUG,"Interrupted");
                            return;
                        }
                        i++;
                    }
                }
            }

不工作(我要使用的实际代码):

new Runnable(){
            @Override
            public void run() {
                for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
                    if (!Thread.currentThread().isInterrupted()) {

                    } else {
                        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Thread Interrupted (Cancelled)");
                        break;
                    }
                }
            }
        };

和1个可能的解决方案是使用变量(布尔)作为可运行的中断标志,我将考虑作为最后的手段,但很乐意知道错误。p>


共 (2) 个答案

  1. # 1 楼答案

    根据{}文档,关闭正在执行的任务是在尽最大努力的基础上完成的

    因此,当您调用ExecutorService.shutdownNow()时,实现将尝试关闭所有当前正在执行的任务。每个任务将一直运行,直到达到检测到中断的点为止

    为了确保线程在早期阶段达到该点,最好在循环中添加一个检查,检查线程是否被插入,如下所示:

    Thread.currentThread().isInterrupted();
    

    通过在每次迭代中进行此调用,线程将在距离实际中断较短的时间间隔内检测到中断

    因此,修改后的Callable代码如下所示:

    private Callable<Object> getTestAllCallable() {
        return new Callable<Object>() {
            @Override
            public Object call() {
                for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
                    if(Thread.currentThread().isInterrupted()) {
                        return null;
                    }
                    if(someCondition) {
                        //someWork
                    } else {
                        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
                        return null;
                    }
                }
                return null;
            }
        };
    }
    

    顺便说一下,如果您不打算从call()方法返回任何值,那么使用Callable是没有意义的。如果您的任务中需要参数化类型,只需创建一个参数化的Runnable,如下所示:

    public class ParameterizedRunnable<T> implements Runnable {
        private final T t;
    
        public ParameterizedRunnable(T t) {
            this.t = t;
        }
    
        public void run() {
            //do some work here
        }
    }
    
  2. # 2 楼答案

    解决方案(出路): 因此,最后我继续使用自定义内部标志(布尔值)作为线程中断标志,MyRunnable将在每次迭代中检查该标志(使用自定义标志的runnable的自定义实现,以便与每个runnable关联一个标志)。当需要取消ExecutorService(ThreadPool)下的线程时,我迭代所有未来的对象,并将其关联到MyRunnable,然后将其中断标志(自定义标志)设置为true,这将中断/关闭线程

    线程池管理器:

    private class ThreadPoolManager {
    
            private ExecutorService executorService;
            private final Map<Future,MyRunnable> queuedFutures;
            private final BlockingQueue<Runnable> blockingQueue;
    
            private ThreadPoolManager() {
                MyLogger.log(DEBUG, "Threadpool-created(constructor)");
                queuedFutures = new HashMap<>();
                blockingQueue = new LinkedBlockingDeque<>();
                executorService = getNewExecutorService();
            }
    
            private ExecutorService getNewExecutorService() {
                return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
            }
    
            private void submitItemTest(MyRunnable runnable) {
                MyLogger.log(DEBUG, "Threadpool-submitted item test");
                if (executorService.isShutdown()) {
                    executorService = getNewExecutorService();
                }
                Future future = executorService.submit(runnable);
                queuedFutures.put(future,runnable);
            }
    
            private void submitTestAll(MyRunnable runnable) {
                MyLogger.log(DEBUG, "Threadpool-submitted test all");
                if (executorService.isShutdown()) {
                    executorService = getNewExecutorService();
                }
                cancelAll();
                Future future = executorService.submit(runnable);
                queuedFutures.put(future,runnable);
            }
    
            private void cancelAll() {
                MyLogger.log(DEBUG, "ThreadPool: Cancelling all future tasks");
                blockingQueue.clear();
                for (Future future : queuedFutures.keySet()) {
                    if (!future.isDone()) {
                        queuedFutures.get(future).continueRunning=false;
                        MyLogger.log(DEBUG, "Cancelled");
                    }
                }
                queuedFutures.clear();
            }
    
            private void shutdownExecutor() {
                cancelAll();
                MyLogger.log(DEBUG, "ThreadPool: Shuttingdown threadpool");
                executorService.shutdown();
            }
        }
    

    MyRunnable(实现Runnable的抽象类):

    private abstract class MyRunnable implements Runnable {
            boolean continueRunning=true;
        }
    

    MyRunnable(抽象类MyRunnable的实例):

    new MyRunnable() {
           @Override
           public void run() {
               for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
                     if (continueRunning) {
                            //someWork
                     } else {
                        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadPool: Pool Thread Interrupted (closing down)");
                         break;
                     }
                }
                System.out.println("ThreadPool: Test complete");
             }
         };
    

    现在,打电话给threadPoolManager。shutdownExecutor()关闭/中断当前运行的所有线程