diff --git a/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Consumer.java b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Consumer.java index 5a059b74df..9bbcbb923c 100644 --- a/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Consumer.java +++ b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Consumer.java @@ -2,11 +2,9 @@ package com.baeldung.producerconsumer; public class Consumer implements Runnable { private final DataQueue dataQueue; - private volatile boolean runFlag; public Consumer(DataQueue dataQueue) { this.dataQueue = dataQueue; - runFlag = true; } @Override @@ -15,22 +13,23 @@ public class Consumer implements Runnable { } public void consume() { - while (runFlag) { - Message message; - if (dataQueue.isEmpty()) { - try { - dataQueue.waitOnEmpty(); - } catch (InterruptedException e) { - e.printStackTrace(); + while (dataQueue.runFlag) { + synchronized (this) { + while (dataQueue.isEmpty() && dataQueue.runFlag) { + try { + dataQueue.waitOnEmpty(); + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + } + if (!dataQueue.runFlag) { break; } + Message message = dataQueue.remove(); + dataQueue.notifyAllForFull(); + useMessage(message); } - if (!runFlag) { - break; - } - message = dataQueue.remove(); - dataQueue.notifyAllForFull(); - useMessage(message); } System.out.println("Consumer Stopped"); } @@ -45,7 +44,7 @@ public class Consumer implements Runnable { } public void stop() { - runFlag = false; + dataQueue.runFlag = false; dataQueue.notifyAllForEmpty(); } } diff --git a/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/DataQueue.java b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/DataQueue.java index 6ab4fa2bc3..8867ddeb63 100644 --- a/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/DataQueue.java +++ b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/DataQueue.java @@ -9,6 +9,8 @@ public class DataQueue { private final Object FULL_QUEUE = new Object(); private final Object EMPTY_QUEUE = new Object(); + public boolean runFlag = true; + DataQueue(int maxSize) { this.maxSize = maxSize; } diff --git a/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Producer.java b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Producer.java index 80d693bd97..04ad39f26e 100644 --- a/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Producer.java +++ b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Producer.java @@ -2,13 +2,11 @@ package com.baeldung.producerconsumer; public class Producer implements Runnable { private final DataQueue dataQueue; - private volatile boolean runFlag; private static int idSequence = 0; public Producer(DataQueue dataQueue) { this.dataQueue = dataQueue; - runFlag = true; } @Override @@ -17,21 +15,23 @@ public class Producer implements Runnable { } public void produce() { - while (runFlag) { - Message message = generateMessage(); - while (dataQueue.isFull()) { - try { - dataQueue.waitOnFull(); - } catch (InterruptedException e) { - e.printStackTrace(); + while (dataQueue.runFlag) { + synchronized (this) { + while (dataQueue.isFull() && dataQueue.runFlag) { + try { + dataQueue.waitOnFull(); + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + } + if (!dataQueue.runFlag) { break; } + Message message = generateMessage(); + dataQueue.add(message); + dataQueue.notifyAllForEmpty(); } - if (!runFlag) { - break; - } - dataQueue.add(message); - dataQueue.notifyAllForEmpty(); } System.out.println("Producer Stopped"); } @@ -47,7 +47,7 @@ public class Producer implements Runnable { } public void stop() { - runFlag = false; + dataQueue.runFlag = false; dataQueue.notifyAllForFull(); } }