有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

ThreadPoolExecutor中的java死锁

遇到这样一种情况:ThreadPoolExecutor停在execute(Runnable)函数中,而所有ThreadPool线程都在getTask函数中等待,工作队列为空

有人有什么想法吗

ArrayBlockingQueuecorePoolSize == maximumPoolSize = 4创建ThreadPoolExecutor

[Edit]更准确地说,线程在ThreadPoolExecutor.exec(Runnable command)func中被阻塞。它有任务要执行,但没有执行

[Edit2]执行器被阻塞在工作队列(ArrayBlockingQueue)中的某个位置

[Edit3]调用堆栈:

thread = front_end(224)
at sun.misc.Unsafe.park(Native methord)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:778)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1114)
at
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
at java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:224)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:653)
at net.listenThread.WorkersPool.execute(WorkersPool.java:45)

同时,工作队列为空(使用远程调试进行检查)

[Edit4]使用ThreadPoolExecutor的代码:

public WorkersPool(int size) {
  pool = new ThreadPoolExecutor(size, size, IDLE_WORKER_THREAD_TIMEOUT, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(WORK_QUEUE_CAPACITY),
      new ThreadFactory() {
        @NotNull
        private final AtomicInteger threadsCount = new AtomicInteger(0);

        @NotNull
        public Thread newThread(@NotNull Runnable r) {
          final Thread thread = new Thread(r);
          thread.setName("net_worker_" + threadsCount.incrementAndGet());
          return thread;
        }
      },

      new RejectedExecutionHandler() {
        public void rejectedExecution(@Nullable Runnable r, @Nullable ThreadPoolExecutor executor) {
          Verify.warning("new task " + r + " is discarded");
        }
      });
  }

  public void execute(@NotNull Runnable task) {
    pool.execute(task);
  }

  public void stopWorkers() throws WorkersTerminationFailedException {
    pool.shutdownNow();
    try {
      pool.awaitTermination(THREAD_TERMINATION_WAIT_TIME, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      throw new WorkersTerminationFailedException("Workers-pool termination failed", e);
    }
  }
}

共 (6) 个答案

  1. # 1 楼答案

    这听起来像是JVM早于6u21的bug。一些(可能是所有)操作系统的编译本机代码中存在问题

    从链接:

    The bug is caused by missing memory barriers in various Parker::park() paths that can result in lost wakeups and hangs. (Note that PlatformEvent::park used by built-in synchronization is not vulnerable to the issue). -XX:+UseMembar constitues a work-around because the membar barrier in the state transition logic hides the problem in Parker::. (that is, there's nothing wrong with the use -UseMembar mechanism, but +UseMembar hides the bug Parker::). This is a day-one bug introduced with the addition of java.util.concurrent in JDK 5.0. I developed a simple C mode of the failure and it seems more likely to manifest on modern AMD and Nehalem platforms, likely because of deeper store buffers that take longer to drain. I provided a tentative fix to Doug Lea for Parker::park which appears to eliminate the bug. I'll be delivering this fix to runtime. (I'll also augment the CR with additional test cases and and a longer explanation). This is likely a good candidate for back-ports.

    链接:JVM Bug

    解决方法是可用的,但您最好只获取最新的Java副本

  2. # 2 楼答案

    下面是库代码源代码(实际上是来自http://spymemcached.googlecode.com/files/memcached-2.4.2-sources.zip的类),
    -有点复杂——如果我没弄错的话,增加了对FutureTask重复调用的保护——但似乎不容易死锁——非常简单的线程池用法:

    package net.spy.memcached.transcoders;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    import net.spy.memcached.CachedData;
    import net.spy.memcached.compat.SpyObject;
    
    /**
     * Asynchronous transcoder.
     */
    public class TranscodeService extends SpyObject {
    
        private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, 60L,
                TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100),
                new ThreadPoolExecutor.DiscardPolicy());
    
        /**
         * Perform a decode.
         */
        public <T> Future<T> decode(final Transcoder<T> tc,
                final CachedData cachedData) {
    
            assert !pool.isShutdown() : "Pool has already shut down.";
    
            TranscodeService.Task<T> task = new TranscodeService.Task<T>(
                    new Callable<T>() {
                        public T call() {
                            return tc.decode(cachedData);
                        }
                    });
    
            if (tc.asyncDecode(cachedData)) {
                this.pool.execute(task);
            }
            return task;
        }
    
        /**
         * Shut down the pool.
         */
        public void shutdown() {
            pool.shutdown();
        }
    
        /**
         * Ask whether this service has been shut down.
         */
        public boolean isShutdown() {
            return pool.isShutdown();
        }
    
        private static class Task<T> extends FutureTask<T> {
            private final AtomicBoolean isRunning = new AtomicBoolean(false);
    
            public Task(Callable<T> callable) {
                super(callable);
            }
    
            @Override
            public T get() throws InterruptedException, ExecutionException {
                this.run();
                return super.get();
            }
    
            @Override
            public T get(long timeout, TimeUnit unit) throws InterruptedException,
                    ExecutionException, TimeoutException {
                this.run();
                return super.get(timeout, unit);
            }
    
            @Override
            public void run() {
                if (this.isRunning.compareAndSet(false, true)) {
                    super.run();
                }
            }
        }
    
    }
    
  3. # 3 楼答案

    真奇怪

    在写作之前尝试一下TPE:

    • 另一个BlockingQueue。,e、 g.LinkedBlockingQueue

    • 在ArrayBlockingQueue中指定fairity=true,即使用new ArrayBlockingQueue(n, true)

    从这两个选项中,我会选择第二个选项,因为offer()被阻止是非常奇怪的;想到的一个原因是Linux上的线程调度策略。这只是一个假设

  4. # 4 楼答案

    正如有人已经提到的,这听起来很正常,ThreadPoolExecutor只是在等待做一些工作。如果你想阻止它,你需要打电话:

    遗嘱执行人。关机()

    使其终止,通常由执行人执行。等待终止

  5. # 5 楼答案

    我在ThreadPoolExecutorexecute(Runnable)代码中没有看到任何锁定。唯一的变量是workQueue。你给你的BlockingQueue提供了什么样的ThreadPoolExecutor

    关于僵局的话题:

    您可以通过检查完整的线程转储来确认这是死锁,如Windows上的<ctrl><break>或UNIX系统上的kill -QUIT提供的

    一旦有了这些数据,就可以检查线程了。以下是Sun's article on examining thread dumps (suggested reading)的相关摘录:

    For hanging, deadlocked or frozen programs: If you think your program is hanging, generate a stack trace and examine the threads in states MW or CW. If the program is deadlocked then some of the system threads will probably show up as the current threads, because there is nothing else for the JVM to do.

    更轻松的一点是:如果您在IDE中运行,您能否确保这些方法中没有启用断点

  6. # 6 楼答案

    这种死锁可能是因为您从执行器本身运行任务。例如,您提交了一个任务,而这个任务又触发了另外4个任务。如果池大小等于4,那么就完全溢出,最后一个任务将等待任务中有人返回值。但第一个任务等待所有分叉任务完成