From c60c8705060224c6f606cb7428747278638e3945 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 18 Jan 2017 22:50:56 +0000 Subject: [PATCH 1/3] BAEL-613 --- .../countdownlatch/BrokenWorker.java | 23 +++++++ .../concurrent/countdownlatch/Worker.java | 22 +++++++ .../CountdownLatchExampleTest.java | 64 +++++++++++++++++++ 3 files changed, 109 insertions(+) create mode 100644 core-java/src/main/java/com/baeldung/concurrent/countdownlatch/BrokenWorker.java create mode 100644 core-java/src/main/java/com/baeldung/concurrent/countdownlatch/Worker.java create mode 100644 core-java/src/test/java/com/baeldung/concurrent/countdownlatch/CountdownLatchExampleTest.java diff --git a/core-java/src/main/java/com/baeldung/concurrent/countdownlatch/BrokenWorker.java b/core-java/src/main/java/com/baeldung/concurrent/countdownlatch/BrokenWorker.java new file mode 100644 index 0000000000..90cd01b69f --- /dev/null +++ b/core-java/src/main/java/com/baeldung/concurrent/countdownlatch/BrokenWorker.java @@ -0,0 +1,23 @@ +package com.baeldung.concurrent.countdownlatch; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +public class BrokenWorker implements Runnable { + private final List outputScraper; + private final CountDownLatch countDownLatch; + + public BrokenWorker(final List outputScraper, final CountDownLatch countDownLatch) { + this.outputScraper = outputScraper; + this.countDownLatch = countDownLatch; + } + + @Override + public void run() { + if (true) { + throw new RuntimeException("Oh dear"); + } + countDownLatch.countDown(); + outputScraper.add("Counted down"); + } +} diff --git a/core-java/src/main/java/com/baeldung/concurrent/countdownlatch/Worker.java b/core-java/src/main/java/com/baeldung/concurrent/countdownlatch/Worker.java new file mode 100644 index 0000000000..4701f26530 --- /dev/null +++ b/core-java/src/main/java/com/baeldung/concurrent/countdownlatch/Worker.java @@ -0,0 +1,22 @@ +package com.baeldung.concurrent.countdownlatch; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +public class Worker implements Runnable { + private final List outputScraper; + private final CountDownLatch countDownLatch; + + public Worker(final List outputScraper, final CountDownLatch countDownLatch) { + this.outputScraper = outputScraper; + this.countDownLatch = countDownLatch; + } + + @Override + public void run() { + // Do some work + System.out.println("Doing some logic"); + countDownLatch.countDown(); + outputScraper.add("Counted down"); + } +} diff --git a/core-java/src/test/java/com/baeldung/concurrent/countdownlatch/CountdownLatchExampleTest.java b/core-java/src/test/java/com/baeldung/concurrent/countdownlatch/CountdownLatchExampleTest.java new file mode 100644 index 0000000000..9aa574a067 --- /dev/null +++ b/core-java/src/test/java/com/baeldung/concurrent/countdownlatch/CountdownLatchExampleTest.java @@ -0,0 +1,64 @@ +package com.baeldung.concurrent.countdownlatch; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; + +public class CountdownLatchExampleTest { + @Test + public void shouldBlockUntilLatchIsReleased() throws InterruptedException { + + // Given + List outputScraper = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch countDownLatch = new CountDownLatch(5); + List workers = Stream + .generate(() -> new Thread(new Worker(outputScraper, countDownLatch))) + .limit(5) + .collect(toList()); + + // When + workers.forEach(Thread::start); + countDownLatch.await(); // Block until workers finish + outputScraper.add("Latch released"); + + // Then + outputScraper.forEach(Object::toString); + assertThat(outputScraper) + .containsExactly( + "Counted down", + "Counted down", + "Counted down", + "Counted down", + "Counted down", + "Latch released" + ); + } + + @Test + public void shouldEventuallyTimeout() throws InterruptedException { + // Given + List outputScraper = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch countDownLatch = new CountDownLatch(5); + List workers = Stream + .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch))) + .limit(5) + .collect(toList()); + + // When + workers.forEach(Thread::start); + final boolean result = countDownLatch.await(3L, TimeUnit.SECONDS); + + // Then + assertThat(result).isTrue(); + } +} \ No newline at end of file From 65e7c7fa300584bac1427ba31de88a85b0bf29b8 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 20 Jan 2017 20:48:48 +0000 Subject: [PATCH 2/3] Updated test names --- .../concurrent/countdownlatch/CountdownLatchExampleTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core-java/src/test/java/com/baeldung/concurrent/countdownlatch/CountdownLatchExampleTest.java b/core-java/src/test/java/com/baeldung/concurrent/countdownlatch/CountdownLatchExampleTest.java index 9aa574a067..484eedff24 100644 --- a/core-java/src/test/java/com/baeldung/concurrent/countdownlatch/CountdownLatchExampleTest.java +++ b/core-java/src/test/java/com/baeldung/concurrent/countdownlatch/CountdownLatchExampleTest.java @@ -16,7 +16,7 @@ import static org.assertj.core.api.Assertions.assertThat; public class CountdownLatchExampleTest { @Test - public void shouldBlockUntilLatchIsReleased() throws InterruptedException { + public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() throws InterruptedException { // Given List outputScraper = Collections.synchronizedList(new ArrayList<>()); @@ -45,7 +45,7 @@ public class CountdownLatchExampleTest { } @Test - public void shouldEventuallyTimeout() throws InterruptedException { + public void whenFailingToParallelProcess_thenMainThreadShouldTimeout() throws InterruptedException { // Given List outputScraper = Collections.synchronizedList(new ArrayList<>()); CountDownLatch countDownLatch = new CountDownLatch(5); From 21649ab375c3a7411b509ed4e1efbb016bbcdac3 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Sat, 21 Jan 2017 20:50:44 +0000 Subject: [PATCH 3/3] BAEL-613 - Added workers which wait for all other workers to start --- .../countdownlatch/WaitingWorker.java | 37 +++++++++++++++++ .../CountdownLatchExampleTest.java | 40 +++++++++++++++++-- 2 files changed, 73 insertions(+), 4 deletions(-) create mode 100644 core-java/src/main/java/com/baeldung/concurrent/countdownlatch/WaitingWorker.java diff --git a/core-java/src/main/java/com/baeldung/concurrent/countdownlatch/WaitingWorker.java b/core-java/src/main/java/com/baeldung/concurrent/countdownlatch/WaitingWorker.java new file mode 100644 index 0000000000..58a2a5f6b4 --- /dev/null +++ b/core-java/src/main/java/com/baeldung/concurrent/countdownlatch/WaitingWorker.java @@ -0,0 +1,37 @@ +package com.baeldung.concurrent.countdownlatch; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +public class WaitingWorker implements Runnable { + + private final List outputScraper; + private final CountDownLatch readyThreadCounter; + private final CountDownLatch callingThreadBlocker; + private final CountDownLatch completedThreadCounter; + + public WaitingWorker(final List outputScraper, + final CountDownLatch readyThreadCounter, + final CountDownLatch callingThreadBlocker, + CountDownLatch completedThreadCounter) { + + this.outputScraper = outputScraper; + this.readyThreadCounter = readyThreadCounter; + this.callingThreadBlocker = callingThreadBlocker; + this.completedThreadCounter = completedThreadCounter; + } + + @Override + public void run() { + // Mark this thread as read / started + readyThreadCounter.countDown(); + try { + callingThreadBlocker.await(); + outputScraper.add("Counted down"); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + completedThreadCounter.countDown(); + } + } +} diff --git a/core-java/src/test/java/com/baeldung/concurrent/countdownlatch/CountdownLatchExampleTest.java b/core-java/src/test/java/com/baeldung/concurrent/countdownlatch/CountdownLatchExampleTest.java index 484eedff24..2e77042f0b 100644 --- a/core-java/src/test/java/com/baeldung/concurrent/countdownlatch/CountdownLatchExampleTest.java +++ b/core-java/src/test/java/com/baeldung/concurrent/countdownlatch/CountdownLatchExampleTest.java @@ -1,6 +1,5 @@ package com.baeldung.concurrent.countdownlatch; -import org.assertj.core.api.Assertions; import org.junit.Test; import java.util.ArrayList; @@ -8,7 +7,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; @@ -17,7 +15,6 @@ import static org.assertj.core.api.Assertions.assertThat; public class CountdownLatchExampleTest { @Test public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() throws InterruptedException { - // Given List outputScraper = Collections.synchronizedList(new ArrayList<>()); CountDownLatch countDownLatch = new CountDownLatch(5); @@ -59,6 +56,41 @@ public class CountdownLatchExampleTest { final boolean result = countDownLatch.await(3L, TimeUnit.SECONDS); // Then - assertThat(result).isTrue(); + assertThat(result).isFalse(); } + + @Test + public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime() throws InterruptedException { + // Given + List outputScraper = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch readyThreadCounter = new CountDownLatch(5); + CountDownLatch callingThreadBlocker = new CountDownLatch(1); + CountDownLatch completedThreadCounter = new CountDownLatch(5); + List workers = Stream + .generate(() -> new Thread(new WaitingWorker(outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter))) + .limit(5) + .collect(toList()); + + // When + workers.forEach(Thread::start); + readyThreadCounter.await(); // Block until workers start + outputScraper.add("Workers ready"); + callingThreadBlocker.countDown(); // Start workers + completedThreadCounter.await(); // Block until workers finish + outputScraper.add("Workers complete"); + + // Then + outputScraper.forEach(Object::toString); + assertThat(outputScraper) + .containsExactly( + "Workers ready", + "Counted down", + "Counted down", + "Counted down", + "Counted down", + "Counted down", + "Workers complete" + ); + } + } \ No newline at end of file