有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

带有信号量的java ProducerConsumer陷入死锁

我正在尝试创建仅使用信号量的生产者-消费者。使用以下代码

public class Application {

    public static int id = 0;

    public static void main(String[] args) {

        Semaphore producerSem = new Semaphore(1);
        Semaphore consumerSem = new Semaphore(1);
        Queue<Integer> line = new LinkedList<>();

        Runnable produce = () -> {
            try {
                producerSem.acquire();
                System.out.println(Thread.currentThread().getName() + " producing");
                while (line.size() > 10) continue;
                Thread.sleep(2000);
                line.offer(id);
                System.out.println(Thread.currentThread().getName() + " produced thing with id: " + id);
                id++;
                System.out.println(Thread.currentThread().getName() + " finished producing");
                producerSem.release();

            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        Runnable consume = () -> {
            try {
                consumerSem.acquire();
                System.out.println(Thread.currentThread().getName() + " consuming");
                while (line.size() < 1) continue;
                int product = line.remove();
                System.out.println(Thread.currentThread().getName() + " consumed thing with id: " + product);
                Thread.sleep(3000);
                System.out.println(Thread.currentThread().getName() + " finished consuming");
                consumerSem.release();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        for (int i = 0; i < 100; i++) {
            new Thread(consume, "Consumer - " + i).start();
            new Thread(produce, "Producer - " + i).start();
        }
    }
}

它陷入了死锁,我无法调试它,因为它进入id 10并停止,所以看起来有一个元素被删除了,但它无法向前移动。在任何地方使用断点都可以正常工作,即使在任何任务结束时也是如此


共 (1) 个答案

  1. # 1 楼答案

    我认为这是一个很好的练习,只使用信号量来编写线程安全的代码

    你应该保护三个方面:

    • id是一个简单的全局整数,一次只能写入一个进程
    • 向队列中添加或删除作业也应受到保护
    • 虽然我们可以使用队列保护来检查可用性,但等待新作业的一种优雅方式是使用同步计数器(信号量)。这样,我们不需要排队只是为了检查

    修改后的代码可能是:

    // our `id` counter...
    private static int id = 0;
    // ...and their semaphore for thread safety
    private static Semaphore idSemaphore = new Semaphore(1);
    
    // jobs queue...
    private static Queue<Integer> lineJobs = new LinkedList<>();
    // ...and their semaphore for thread safety
    private static Semaphore lineSemaphore = new Semaphore(1);
    
    // thread safe counter for (thread safety) jobs availability check
    private static Semaphore lineSemaphoreAvailable = new Semaphore(0);
    
    public static void main(String[] args) {
    
    
        Runnable produce = () -> {
    
            // we will set `myId` only once
            final int myId;
    
            // create a new id
            try {
                // wait to be granted to touch the global (unsafe integer) `id`
                idSemaphore.acquire();
                myId = id++;
            } catch (InterruptedException e) {
                System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
                return;
            } finally {
                // with exception or not release the semaphore
                idSemaphore.release();
            }
    
            System.out.printf("%s: producing job #%d...%n", Thread.currentThread().getName(), myId);
    
            // producing job...
            try {
                Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
            } catch (InterruptedException e) {
                System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
                return;
            }
    
            // store the job
            try {
                // wait to be granted to touch the global (unsafe queue) `lineJobs`
                lineSemaphore.acquire();
                lineJobs.add(myId);
    
                // notify to consumers a new one job is available to peek
                lineSemaphoreAvailable.release();
    
                System.out.printf("%s: job #%d ready to do!%n", Thread.currentThread().getName(), myId);
            } catch (InterruptedException e) {
                System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
                return;
            } finally {
                lineSemaphore.release();
            }
        };
    
        Runnable consume = () -> {
    
            final int myId;
    
            // get a job
            try {
                // wait for a new job to become available
                lineSemaphoreAvailable.acquire();
                
                // wait to be granted to touch the global (unsafe queue) `lineJobs`
                lineSemaphore.acquire();
                myId = lineJobs.remove();
                System.out.printf("%s: job #%d adquired%n", Thread.currentThread().getName(), myId);
            } catch (InterruptedException e) {
                System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
                return;
            } finally {
                lineSemaphore.release();
            }
    
            System.out.printf("%s: job #%d finished!%n", Thread.currentThread().getName(), myId);
        };
    
        for (int i = 0; i < 100; i++) {
            new Thread(consume, "Consumer - " + i).start();
            new Thread(produce, "Producer - " + i).start();
        }
    }
    

    有输出

    Producer - 0: producing job #0...
    Producer - 1: producing job #1...
    ...
    Producer - 46: producing job #45...
    Producer - 45: producing job #44...
    Producer - 9: job #9 ready to do!
    Producer - 44: producing job #46...
    Producer - 47: producing job #47...
    Producer - 48: producing job #48...
    Producer - 49: producing job #49...
    Consumer - 0: job #9 adquired
    Consumer - 0: job #9 finished!
    Producer - 14: job #14 ready to do!
    Consumer - 1: job #14 adquired
    Consumer - 1: job #14 finished!
    Producer - 50: producing job #50...
    Producer - 51: producing job #51...
    ...
    Producer - 72: producing job #72...
    Producer - 39: job #39 ready to do!
    Consumer - 2: job #39 adquired
    Consumer - 2: job #39 finished!
    Producer - 73: producing job #73...
    Producer - 74: producing job #74...
    Producer - 75: producing job #75...
    Producer - 10: job #10 ready to do!
    Producer - 40: job #40 ready to do!
    Consumer - 3: job #10 adquired
    Consumer - 3: job #10 finished!
    Producer - 76: producing job #76...
    Producer - 77: producing job #77...
    ...
    Producer - 82: producing job #82...
    Producer - 83: producing job #83...
    Producer - 84: producing job #84...
    Consumer - 4: job #40 adquired
    Consumer - 4: job #40 finished!
    Producer - 85: producing job #85...
    Producer - 86: producing job #86...
    Producer - 71: job #71 ready to do!
    Producer - 87: producing job #87...
    Consumer - 5: job #71 adquired
    Consumer - 5: job #71 finished!
    Producer - 88: producing job #88...
    Producer - 92: producing job #92...
    Producer - 91: producing job #91...
    ...
    Producer - 96: producing job #95...
    Producer - 98: producing job #98...
    Producer - 99: producing job #99...
    Producer - 84: job #84 ready to do!
    Consumer - 6: job #84 adquired
    Consumer - 6: job #84 finished!
    ...
    Consumer - 97: job #63 adquired
    Consumer - 97: job #63 finished!
    Producer - 90: job #90 ready to do!
    Consumer - 98: job #90 adquired
    Consumer - 98: job #90 finished!
    Producer - 87: job #87 ready to do!
    Consumer - 99: job #87 adquired
    Consumer - 99: job #87 finished!
    
    Process finished with exit code 0