java concurrency (#2502)
* fix spring config * fix spring config * fix spring config * minor fix * fix spring-boot module * fix pom * upgrade jackson * minor fix * java concurrency * cleanup
This commit is contained in:
committed by
Grzegorz Piwowarek
parent
09d10ac85f
commit
25263f1d6f
@@ -0,0 +1,184 @@
|
||||
package com.baeldung.completablefuture;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class CompletableFutureLongRunningUnitTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(CompletableFutureLongRunningUnitTest.class);
|
||||
|
||||
@Test
|
||||
public void whenRunningCompletableFutureAsynchronously_thenGetMethodWaitsForResult() throws InterruptedException, ExecutionException {
|
||||
Future<String> completableFuture = calculateAsync();
|
||||
|
||||
String result = completableFuture.get();
|
||||
assertEquals("Hello", result);
|
||||
}
|
||||
|
||||
private Future<String> calculateAsync() throws InterruptedException {
|
||||
CompletableFuture<String> completableFuture = new CompletableFuture<>();
|
||||
|
||||
Executors.newCachedThreadPool()
|
||||
.submit(() -> {
|
||||
Thread.sleep(500);
|
||||
completableFuture.complete("Hello");
|
||||
return null;
|
||||
});
|
||||
|
||||
return completableFuture;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenRunningCompletableFutureWithResult_thenGetMethodReturnsImmediately() throws InterruptedException, ExecutionException {
|
||||
Future<String> completableFuture = CompletableFuture.completedFuture("Hello");
|
||||
|
||||
String result = completableFuture.get();
|
||||
assertEquals("Hello", result);
|
||||
}
|
||||
|
||||
private Future<String> calculateAsyncWithCancellation() throws InterruptedException {
|
||||
CompletableFuture<String> completableFuture = new CompletableFuture<>();
|
||||
|
||||
Executors.newCachedThreadPool()
|
||||
.submit(() -> {
|
||||
Thread.sleep(500);
|
||||
completableFuture.cancel(false);
|
||||
return null;
|
||||
});
|
||||
|
||||
return completableFuture;
|
||||
}
|
||||
|
||||
@Test(expected = CancellationException.class)
|
||||
public void whenCancelingTheFuture_thenThrowsCancellationException() throws ExecutionException, InterruptedException {
|
||||
Future<String> future = calculateAsyncWithCancellation();
|
||||
future.get();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenCreatingCompletableFutureWithSupplyAsync_thenFutureReturnsValue() throws ExecutionException, InterruptedException {
|
||||
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
|
||||
|
||||
assertEquals("Hello", future.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenAddingThenAcceptToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
|
||||
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
|
||||
|
||||
CompletableFuture<Void> future = completableFuture.thenAccept(s -> LOG.debug("Computation returned: " + s));
|
||||
|
||||
future.get();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenAddingThenRunToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
|
||||
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
|
||||
|
||||
CompletableFuture<Void> future = completableFuture.thenRun(() -> LOG.debug("Computation finished."));
|
||||
|
||||
future.get();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenAddingThenApplyToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
|
||||
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
|
||||
|
||||
CompletableFuture<String> future = completableFuture.thenApply(s -> s + " World");
|
||||
|
||||
assertEquals("Hello World", future.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUsingThenCompose_thenFuturesExecuteSequentially() throws ExecutionException, InterruptedException {
|
||||
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
|
||||
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
|
||||
|
||||
assertEquals("Hello World", completableFuture.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUsingThenCombine_thenWaitForExecutionOfBothFutures() throws ExecutionException, InterruptedException {
|
||||
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
|
||||
.thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2);
|
||||
|
||||
assertEquals("Hello World", completableFuture.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUsingThenAcceptBoth_thenWaitForExecutionOfBothFutures() throws ExecutionException, InterruptedException {
|
||||
CompletableFuture.supplyAsync(() -> "Hello")
|
||||
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> LOG.debug(s1 + s2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFutureCombinedWithAllOfCompletes_thenAllFuturesAreDone() throws ExecutionException, InterruptedException {
|
||||
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
|
||||
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Beautiful");
|
||||
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World");
|
||||
|
||||
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);
|
||||
|
||||
// ...
|
||||
|
||||
combinedFuture.get();
|
||||
|
||||
assertTrue(future1.isDone());
|
||||
assertTrue(future2.isDone());
|
||||
assertTrue(future3.isDone());
|
||||
|
||||
String combined = Stream.of(future1, future2, future3)
|
||||
.map(CompletableFuture::join)
|
||||
.collect(Collectors.joining(" "));
|
||||
|
||||
assertEquals("Hello Beautiful World", combined);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFutureThrows_thenHandleMethodReceivesException() throws ExecutionException, InterruptedException {
|
||||
String name = null;
|
||||
|
||||
// ...
|
||||
|
||||
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
|
||||
if (name == null) {
|
||||
throw new RuntimeException("Computation error!");
|
||||
}
|
||||
return "Hello, " + name;
|
||||
})
|
||||
.handle((s, t) -> s != null ? s : "Hello, Stranger!");
|
||||
|
||||
assertEquals("Hello, Stranger!", completableFuture.get());
|
||||
}
|
||||
|
||||
@Test(expected = ExecutionException.class)
|
||||
public void whenCompletingFutureExceptionally_thenGetMethodThrows() throws ExecutionException, InterruptedException {
|
||||
CompletableFuture<String> completableFuture = new CompletableFuture<>();
|
||||
|
||||
// ...
|
||||
|
||||
completableFuture.completeExceptionally(new RuntimeException("Calculation failed!"));
|
||||
|
||||
// ...
|
||||
|
||||
completableFuture.get();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenAddingThenApplyAsyncToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
|
||||
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
|
||||
|
||||
CompletableFuture<String> future = completableFuture.thenApplyAsync(s -> s + " World");
|
||||
|
||||
assertEquals("Hello World", future.get());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package com.baeldung.concurrent.accumulator;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.LongAccumulator;
|
||||
import java.util.function.LongBinaryOperator;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
public class LongAccumulatorUnitTest {
|
||||
|
||||
@Test
|
||||
public void givenLongAccumulator_whenApplyActionOnItFromMultipleThrads_thenShouldProduceProperResult() throws InterruptedException {
|
||||
// given
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(8);
|
||||
LongBinaryOperator sum = Long::sum;
|
||||
LongAccumulator accumulator = new LongAccumulator(sum, 0L);
|
||||
int numberOfThreads = 4;
|
||||
int numberOfIncrements = 100;
|
||||
|
||||
// when
|
||||
Runnable accumulateAction = () -> IntStream.rangeClosed(0, numberOfIncrements).forEach(accumulator::accumulate);
|
||||
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
executorService.execute(accumulateAction);
|
||||
}
|
||||
|
||||
// then
|
||||
executorService.awaitTermination(500, TimeUnit.MILLISECONDS);
|
||||
executorService.shutdown();
|
||||
assertEquals(accumulator.get(), 20200);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
package com.baeldung.concurrent.adder;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static com.jayway.awaitility.Awaitility.await;
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
public class LongAdderUnitTest {
|
||||
@Test
|
||||
public void givenMultipleThread_whenTheyWriteToSharedLongAdder_thenShouldCalculateSumForThem() throws InterruptedException {
|
||||
//given
|
||||
LongAdder counter = new LongAdder();
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(8);
|
||||
|
||||
int numberOfThreads = 4;
|
||||
int numberOfIncrements = 100;
|
||||
|
||||
//when
|
||||
Runnable incrementAction = () -> IntStream
|
||||
.range(0, numberOfIncrements)
|
||||
.forEach((i) -> counter.increment());
|
||||
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
executorService.execute(incrementAction);
|
||||
}
|
||||
|
||||
//then
|
||||
executorService.awaitTermination(500, TimeUnit.MILLISECONDS);
|
||||
executorService.shutdown();
|
||||
|
||||
assertEquals(counter.sum(), numberOfIncrements * numberOfThreads);
|
||||
assertEquals(counter.sum(), numberOfIncrements * numberOfThreads);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMultipleThread_whenTheyWriteToSharedLongAdder_thenShouldCalculateSumForThemAndResetAdderAfterward() throws InterruptedException {
|
||||
//given
|
||||
LongAdder counter = new LongAdder();
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(8);
|
||||
|
||||
int numberOfThreads = 4;
|
||||
int numberOfIncrements = 100;
|
||||
|
||||
//when
|
||||
Runnable incrementAction = () -> IntStream
|
||||
.range(0, numberOfIncrements)
|
||||
.forEach((i) -> counter.increment());
|
||||
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
executorService.execute(incrementAction);
|
||||
}
|
||||
|
||||
//then
|
||||
executorService.awaitTermination(500, TimeUnit.MILLISECONDS);
|
||||
executorService.shutdown();
|
||||
|
||||
assertEquals(counter.sumThenReset(), numberOfIncrements * numberOfThreads);
|
||||
|
||||
await().until(() -> assertEquals(counter.sum(), 0));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package com.baeldung.concurrent.atomic;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class ThreadSafeCounterTest {
|
||||
|
||||
@Test
|
||||
public void givenMultiThread_whenSafeCounterWithLockIncrement() throws InterruptedException {
|
||||
ExecutorService service = Executors.newFixedThreadPool(3);
|
||||
SafeCounterWithLock safeCounter = new SafeCounterWithLock();
|
||||
|
||||
IntStream.range(0, 1000)
|
||||
.forEach(count -> service.submit(safeCounter::increment));
|
||||
service.awaitTermination(100, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertEquals(1000, safeCounter.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMultiThread_whenSafeCounterWithoutLockIncrement() throws InterruptedException {
|
||||
ExecutorService service = Executors.newFixedThreadPool(3);
|
||||
SafeCounterWithoutLock safeCounter = new SafeCounterWithoutLock();
|
||||
|
||||
IntStream.range(0, 1000)
|
||||
.forEach(count -> service.submit(safeCounter::increment));
|
||||
service.awaitTermination(100, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertEquals(1000, safeCounter.getValue());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
package com.baeldung.concurrent.atomic;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* This test shows the behaviour of a thread-unsafe class in a multithreaded scenario. We are calling
|
||||
* the increment methods 1000 times from a pool of 3 threads. In most of the cases, the counter will
|
||||
* less than 1000, because of lost updates, however, occasionally it may reach 1000, when no threads
|
||||
* called the method simultaneously. This may cause the build to fail occasionally. Hence excluding this
|
||||
* test from build by adding this in manual test
|
||||
*/
|
||||
public class ThreadUnsafeCounterManualTest {
|
||||
|
||||
@Test
|
||||
public void givenMultiThread_whenUnsafeCounterIncrement() throws InterruptedException {
|
||||
ExecutorService service = Executors.newFixedThreadPool(3);
|
||||
UnsafeCounter unsafeCounter = new UnsafeCounter();
|
||||
|
||||
IntStream.range(0, 1000)
|
||||
.forEach(count -> service.submit(unsafeCounter::increment));
|
||||
service.awaitTermination(100, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertEquals(1000, unsafeCounter.getValue());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package com.baeldung.concurrent.copyonwrite;
|
||||
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
|
||||
public class CopyOnWriteArrayListUnitTest {
|
||||
|
||||
@Test
|
||||
public void givenCopyOnWriteList_whenIterateAndAddElementToUnderneathList_thenShouldNotChangeIterator() {
|
||||
//given
|
||||
final CopyOnWriteArrayList<Integer> numbers =
|
||||
new CopyOnWriteArrayList<>(new Integer[]{1, 3, 5, 8});
|
||||
|
||||
//when
|
||||
Iterator<Integer> iterator = numbers.iterator();
|
||||
numbers.add(10);
|
||||
|
||||
//then
|
||||
List<Integer> result = new LinkedList<>();
|
||||
iterator.forEachRemaining(result::add);
|
||||
assertThat(result).containsOnly(1, 3, 5, 8);
|
||||
|
||||
//and
|
||||
Iterator<Integer> iterator2 = numbers.iterator();
|
||||
List<Integer> result2 = new LinkedList<>();
|
||||
iterator2.forEachRemaining(result2::add);
|
||||
|
||||
//then
|
||||
assertThat(result2).containsOnly(1, 3, 5, 8, 10);
|
||||
|
||||
}
|
||||
|
||||
@Test(expected = UnsupportedOperationException.class)
|
||||
public void givenCopyOnWriteList_whenIterateOverItAndTryToRemoveElement_thenShouldThrowException() {
|
||||
//given
|
||||
final CopyOnWriteArrayList<Integer> numbers =
|
||||
new CopyOnWriteArrayList<>(new Integer[]{1, 3, 5, 8});
|
||||
|
||||
//when
|
||||
Iterator<Integer> iterator = numbers.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,70 @@
|
||||
package com.baeldung.concurrent.countdownlatch;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class CountdownLatchExampleIntegrationTest {
|
||||
@Test
|
||||
public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() throws InterruptedException {
|
||||
// Given
|
||||
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
|
||||
CountDownLatch countDownLatch = new CountDownLatch(5);
|
||||
List<Thread> workers = Stream.generate(() -> new Thread(new Worker(outputScraper, countDownLatch))).limit(5).collect(toList());
|
||||
|
||||
// When
|
||||
workers.forEach(Thread::start);
|
||||
countDownLatch.await(); // Block until workers finish
|
||||
outputScraper.add("Latch released");
|
||||
|
||||
// Then
|
||||
outputScraper.forEach(Object::toString);
|
||||
assertThat(outputScraper).containsExactly("Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Latch released");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFailingToParallelProcess_thenMainThreadShouldTimeout() throws InterruptedException {
|
||||
// Given
|
||||
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
|
||||
CountDownLatch countDownLatch = new CountDownLatch(5);
|
||||
List<Thread> workers = Stream.generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch))).limit(5).collect(toList());
|
||||
|
||||
// When
|
||||
workers.forEach(Thread::start);
|
||||
final boolean result = countDownLatch.await(3L, TimeUnit.SECONDS);
|
||||
|
||||
// Then
|
||||
assertThat(result).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime() throws InterruptedException {
|
||||
// Given
|
||||
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
|
||||
CountDownLatch readyThreadCounter = new CountDownLatch(5);
|
||||
CountDownLatch callingThreadBlocker = new CountDownLatch(1);
|
||||
CountDownLatch completedThreadCounter = new CountDownLatch(5);
|
||||
List<Thread> workers = Stream.generate(() -> new Thread(new WaitingWorker(outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter))).limit(5).collect(toList());
|
||||
|
||||
// When
|
||||
workers.forEach(Thread::start);
|
||||
readyThreadCounter.await(); // Block until workers start
|
||||
outputScraper.add("Workers ready");
|
||||
callingThreadBlocker.countDown(); // Start workers
|
||||
completedThreadCounter.await(); // Block until workers finish
|
||||
outputScraper.add("Workers complete");
|
||||
|
||||
// Then
|
||||
outputScraper.forEach(Object::toString);
|
||||
assertThat(outputScraper).containsExactly("Workers ready", "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Workers complete");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,82 @@
|
||||
package com.baeldung.concurrent.delayqueue;
|
||||
|
||||
import org.junit.FixMethodOrder;
|
||||
import org.junit.Test;
|
||||
import org.junit.runners.MethodSorters;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.DelayQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||
public class DelayQueueIntegrationTest {
|
||||
@Test
|
||||
public void givenDelayQueue_whenProduceElement_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
|
||||
//given
|
||||
ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||
BlockingQueue<DelayObject> queue = new DelayQueue<>();
|
||||
int numberOfElementsToProduce = 2;
|
||||
int delayOfEachProducedMessageMilliseconds = 500;
|
||||
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
|
||||
DelayQueueProducer producer
|
||||
= new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
|
||||
|
||||
//when
|
||||
executor.submit(producer);
|
||||
executor.submit(consumer);
|
||||
|
||||
//then
|
||||
executor.awaitTermination(5, TimeUnit.SECONDS);
|
||||
executor.shutdown();
|
||||
assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenDelayQueue_whenProduceElementWithHugeDelay_thenConsumerWasNotAbleToConsumeMessageInGivenTime() throws InterruptedException {
|
||||
//given
|
||||
ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||
BlockingQueue<DelayObject> queue = new DelayQueue<>();
|
||||
int numberOfElementsToProduce = 1;
|
||||
int delayOfEachProducedMessageMilliseconds = 10_000;
|
||||
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
|
||||
DelayQueueProducer producer
|
||||
= new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
|
||||
|
||||
//when
|
||||
executor.submit(producer);
|
||||
executor.submit(consumer);
|
||||
|
||||
//then
|
||||
executor.awaitTermination(5, TimeUnit.SECONDS);
|
||||
executor.shutdown();
|
||||
assertEquals(consumer.numberOfConsumedElements.get(), 0);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenDelayQueue_whenProduceElementWithNegativeDelay_thenConsumeMessageImmediately() throws InterruptedException {
|
||||
//given
|
||||
ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||
BlockingQueue<DelayObject> queue = new DelayQueue<>();
|
||||
int numberOfElementsToProduce = 1;
|
||||
int delayOfEachProducedMessageMilliseconds = -10_000;
|
||||
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
|
||||
DelayQueueProducer producer
|
||||
= new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
|
||||
|
||||
//when
|
||||
executor.submit(producer);
|
||||
executor.submit(consumer);
|
||||
|
||||
//then
|
||||
executor.awaitTermination(1, TimeUnit.SECONDS);
|
||||
executor.shutdown();
|
||||
assertEquals(consumer.numberOfConsumedElements.get(), 1);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.baeldung.concurrent.future;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class FactorialSquareCalculatorUnitTest {
|
||||
|
||||
@Test
|
||||
public void whenCalculatesFactorialSquare_thenReturnCorrectValue() {
|
||||
ForkJoinPool forkJoinPool = new ForkJoinPool();
|
||||
|
||||
FactorialSquareCalculator calculator = new FactorialSquareCalculator(10);
|
||||
|
||||
forkJoinPool.execute(calculator);
|
||||
|
||||
assertEquals("The sum of the squares from 1 to 10 is 385", 385, calculator.join().intValue());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
package com.baeldung.concurrent.future;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class SquareCalculatorIntegrationTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SquareCalculatorIntegrationTest.class);
|
||||
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
private long start;
|
||||
|
||||
private SquareCalculator squareCalculator;
|
||||
|
||||
@Test
|
||||
public void givenExecutorIsSingleThreaded_whenTwoExecutionsAreTriggered_thenRunInSequence() throws InterruptedException, ExecutionException {
|
||||
squareCalculator = new SquareCalculator(Executors.newSingleThreadExecutor());
|
||||
|
||||
Future<Integer> result1 = squareCalculator.calculate(4);
|
||||
Future<Integer> result2 = squareCalculator.calculate(1000);
|
||||
|
||||
while (!result1.isDone() || !result2.isDone()) {
|
||||
LOG.debug(String.format("Task 1 is %s and Task 2 is %s.", result1.isDone() ? "done" : "not done", result2.isDone() ? "done" : "not done"));
|
||||
|
||||
Thread.sleep(300);
|
||||
}
|
||||
|
||||
assertEquals(16, result1.get().intValue());
|
||||
assertEquals(1000000, result2.get().intValue());
|
||||
}
|
||||
|
||||
@Test(expected = TimeoutException.class)
|
||||
public void whenGetWithTimeoutLowerThanExecutionTime_thenThrowException() throws InterruptedException, ExecutionException, TimeoutException {
|
||||
squareCalculator = new SquareCalculator(Executors.newSingleThreadExecutor());
|
||||
|
||||
Future<Integer> result = squareCalculator.calculate(4);
|
||||
|
||||
result.get(500, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenExecutorIsMultiThreaded_whenTwoExecutionsAreTriggered_thenRunInParallel() throws InterruptedException, ExecutionException {
|
||||
squareCalculator = new SquareCalculator(Executors.newFixedThreadPool(2));
|
||||
|
||||
Future<Integer> result1 = squareCalculator.calculate(4);
|
||||
Future<Integer> result2 = squareCalculator.calculate(1000);
|
||||
|
||||
while (!result1.isDone() || !result2.isDone()) {
|
||||
LOG.debug(String.format("Task 1 is %s and Task 2 is %s.", result1.isDone() ? "done" : "not done", result2.isDone() ? "done" : "not done"));
|
||||
|
||||
Thread.sleep(300);
|
||||
}
|
||||
|
||||
assertEquals(16, result1.get().intValue());
|
||||
assertEquals(1000000, result2.get().intValue());
|
||||
}
|
||||
|
||||
@Test(expected = CancellationException.class)
|
||||
public void whenCancelFutureAndCallGet_thenThrowException() throws InterruptedException, ExecutionException, TimeoutException {
|
||||
squareCalculator = new SquareCalculator(Executors.newSingleThreadExecutor());
|
||||
|
||||
Future<Integer> result = squareCalculator.calculate(4);
|
||||
|
||||
boolean canceled = result.cancel(true);
|
||||
|
||||
assertTrue("Future was canceled", canceled);
|
||||
assertTrue("Future was canceled", result.isCancelled());
|
||||
|
||||
result.get();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void start() {
|
||||
start = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
@After
|
||||
public void end() {
|
||||
LOG.debug(String.format("Test %s took %s ms \n", name.getMethodName(), System.currentTimeMillis() - start));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package com.baeldung.concurrent.locks;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
public class SharedObjectWithLockManualTest {
|
||||
|
||||
@Test
|
||||
public void whenLockAcquired_ThenLockedIsTrue() {
|
||||
final SharedObjectWithLock object = new SharedObjectWithLock();
|
||||
|
||||
final int threadCount = 2;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
|
||||
executeThreads(object, threadCount, service);
|
||||
|
||||
assertEquals(true, object.isLocked());
|
||||
|
||||
service.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenLocked_ThenQueuedThread() {
|
||||
final int threadCount = 4;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
final SharedObjectWithLock object = new SharedObjectWithLock();
|
||||
|
||||
executeThreads(object, threadCount, service);
|
||||
|
||||
assertEquals(object.hasQueuedThreads(), true);
|
||||
|
||||
service.shutdown();
|
||||
|
||||
}
|
||||
|
||||
public void whenTryLock_ThenQueuedThread() {
|
||||
final SharedObjectWithLock object = new SharedObjectWithLock();
|
||||
|
||||
final int threadCount = 2;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
|
||||
executeThreads(object, threadCount, service);
|
||||
|
||||
assertEquals(true, object.isLocked());
|
||||
|
||||
service.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenGetCount_ThenCorrectCount() throws InterruptedException {
|
||||
final int threadCount = 4;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
final SharedObjectWithLock object = new SharedObjectWithLock();
|
||||
|
||||
executeThreads(object, threadCount, service);
|
||||
Thread.sleep(1000);
|
||||
assertEquals(object.getCounter(), 4);
|
||||
|
||||
service.shutdown();
|
||||
|
||||
}
|
||||
|
||||
private void executeThreads(SharedObjectWithLock object, int threadCount, ExecutorService service) {
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
service.execute(object::perform);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package com.baeldung.concurrent.locks;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
public class SynchronizedHashMapWithRWLockManualTest {
|
||||
|
||||
@Test
|
||||
public void whenWriting_ThenNoReading() {
|
||||
SynchronizedHashMapWithRWLock object = new SynchronizedHashMapWithRWLock();
|
||||
final int threadCount = 3;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
|
||||
executeWriterThreads(object, threadCount, service);
|
||||
|
||||
assertEquals(object.isReadLockAvailable(), false);
|
||||
|
||||
service.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenReading_ThenMultipleReadingAllowed() {
|
||||
SynchronizedHashMapWithRWLock object = new SynchronizedHashMapWithRWLock();
|
||||
final int threadCount = 5;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
|
||||
executeReaderThreads(object, threadCount, service);
|
||||
|
||||
assertEquals(object.isReadLockAvailable(), true);
|
||||
|
||||
service.shutdown();
|
||||
}
|
||||
|
||||
private void executeWriterThreads(SynchronizedHashMapWithRWLock object, int threadCount, ExecutorService service) {
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
service.execute(() -> {
|
||||
try {
|
||||
object.put("key" + threadCount, "value" + threadCount);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void executeReaderThreads(SynchronizedHashMapWithRWLock object, int threadCount, ExecutorService service) {
|
||||
for (int i = 0; i < threadCount; i++)
|
||||
service.execute(() -> {
|
||||
object.get("key" + threadCount);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package com.baeldung.concurrent.phaser;
|
||||
|
||||
import org.junit.FixMethodOrder;
|
||||
import org.junit.Test;
|
||||
import org.junit.runners.MethodSorters;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Phaser;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||
public class PhaserUnitTest {
|
||||
|
||||
@Test
|
||||
public void givenPhaser_whenCoordinateWorksBetweenThreads_thenShouldCoordinateBetweenMultiplePhases() {
|
||||
//given
|
||||
ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
Phaser ph = new Phaser(1);
|
||||
assertEquals(0, ph.getPhase());
|
||||
|
||||
//when
|
||||
executorService.submit(new LongRunningAction("thread-1", ph));
|
||||
executorService.submit(new LongRunningAction("thread-2", ph));
|
||||
executorService.submit(new LongRunningAction("thread-3", ph));
|
||||
|
||||
//then
|
||||
ph.arriveAndAwaitAdvance();
|
||||
assertEquals(1, ph.getPhase());
|
||||
|
||||
//and
|
||||
executorService.submit(new LongRunningAction("thread-4", ph));
|
||||
executorService.submit(new LongRunningAction("thread-5", ph));
|
||||
ph.arriveAndAwaitAdvance();
|
||||
assertEquals(2, ph.getPhase());
|
||||
|
||||
|
||||
ph.arriveAndDeregister();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package com.baeldung.concurrent.priorityblockingqueue;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.util.Lists.newArrayList;
|
||||
|
||||
public class PriorityBlockingQueueIntegrationTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(PriorityBlockingQueueIntegrationTest.class);
|
||||
|
||||
|
||||
@Test
|
||||
public void givenUnorderedValues_whenPolling_thenShouldOrderQueue() throws InterruptedException {
|
||||
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
|
||||
ArrayList<Integer> polledElements = new ArrayList<>();
|
||||
|
||||
queue.add(1);
|
||||
queue.add(5);
|
||||
queue.add(2);
|
||||
queue.add(3);
|
||||
queue.add(4);
|
||||
|
||||
queue.drainTo(polledElements);
|
||||
|
||||
assertThat(polledElements).containsExactly(1, 2, 3, 4, 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenPollingEmptyQueue_thenShouldBlockThread() throws InterruptedException {
|
||||
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
|
||||
|
||||
final Thread thread = new Thread(() -> {
|
||||
LOG.debug("Polling...");
|
||||
while (true) {
|
||||
try {
|
||||
Integer poll = queue.take();
|
||||
LOG.debug("Polled: " + poll);
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
}
|
||||
});
|
||||
thread.start();
|
||||
|
||||
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
|
||||
LOG.debug("Adding to queue");
|
||||
|
||||
queue.addAll(newArrayList(1, 5, 6, 1, 2, 6, 7));
|
||||
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,114 @@
|
||||
package com.baeldung.concurrent.semaphores;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class SemaphoresManualTest {
|
||||
|
||||
// ========= login queue ======
|
||||
|
||||
@Test
|
||||
public void givenLoginQueue_whenReachLimit_thenBlocked() {
|
||||
final int slots = 10;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
|
||||
final LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
|
||||
IntStream.range(0, slots)
|
||||
.forEach(user -> executorService.execute(loginQueue::tryLogin));
|
||||
executorService.shutdown();
|
||||
|
||||
assertEquals(0, loginQueue.availableSlots());
|
||||
assertFalse(loginQueue.tryLogin());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenLoginQueue_whenLogout_thenSlotsAvailable() {
|
||||
final int slots = 10;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
|
||||
final LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
|
||||
IntStream.range(0, slots)
|
||||
.forEach(user -> executorService.execute(loginQueue::tryLogin));
|
||||
executorService.shutdown();
|
||||
|
||||
assertEquals(0, loginQueue.availableSlots());
|
||||
loginQueue.logout();
|
||||
assertTrue(loginQueue.availableSlots() > 0);
|
||||
assertTrue(loginQueue.tryLogin());
|
||||
}
|
||||
|
||||
// ========= delay queue =======
|
||||
|
||||
@Test
|
||||
public void givenDelayQueue_whenReachLimit_thenBlocked() {
|
||||
final int slots = 50;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
|
||||
final DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
|
||||
IntStream.range(0, slots)
|
||||
.forEach(user -> executorService.execute(delayQueue::tryAdd));
|
||||
executorService.shutdown();
|
||||
|
||||
assertEquals(0, delayQueue.availableSlots());
|
||||
assertFalse(delayQueue.tryAdd());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException {
|
||||
final int slots = 50;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
|
||||
final DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
|
||||
IntStream.range(0, slots)
|
||||
.forEach(user -> executorService.execute(delayQueue::tryAdd));
|
||||
executorService.shutdown();
|
||||
|
||||
assertEquals(0, delayQueue.availableSlots());
|
||||
Thread.sleep(1000);
|
||||
assertTrue(delayQueue.availableSlots() > 0);
|
||||
assertTrue(delayQueue.tryAdd());
|
||||
}
|
||||
|
||||
// ========== mutex ========
|
||||
|
||||
@Test
|
||||
public void whenMutexAndMultipleThreads_thenBlocked() throws InterruptedException {
|
||||
final int count = 5;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(count);
|
||||
final CounterUsingMutex counter = new CounterUsingMutex();
|
||||
IntStream.range(0, count)
|
||||
.forEach(user -> executorService.execute(() -> {
|
||||
try {
|
||||
counter.increase();
|
||||
} catch (final InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}));
|
||||
executorService.shutdown();
|
||||
|
||||
assertTrue(counter.hasQueuedThreads());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount() throws InterruptedException {
|
||||
final int count = 5;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(count);
|
||||
final CounterUsingMutex counter = new CounterUsingMutex();
|
||||
IntStream.range(0, count)
|
||||
.forEach(user -> executorService.execute(() -> {
|
||||
try {
|
||||
counter.increase();
|
||||
} catch (final InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}));
|
||||
executorService.shutdown();
|
||||
assertTrue(counter.hasQueuedThreads());
|
||||
Thread.sleep(5000);
|
||||
assertFalse(counter.hasQueuedThreads());
|
||||
assertEquals(count, counter.getCount());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,120 @@
|
||||
package com.baeldung.concurrent.skiplist;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class ConcurrentSkipListSetIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void givenThreadsProducingEvents_whenGetForEventsFromLastMinute_thenReturnThoseEventsInTheLockFreeWay() throws InterruptedException {
|
||||
//given
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(3);
|
||||
EventWindowSort eventWindowSort = new EventWindowSort();
|
||||
int numberOfThreads = 2;
|
||||
//when
|
||||
Runnable producer = () -> IntStream
|
||||
.rangeClosed(0, 100)
|
||||
.forEach(index -> eventWindowSort.acceptEvent(new Event(ZonedDateTime
|
||||
.now()
|
||||
.minusSeconds(index), UUID
|
||||
.randomUUID()
|
||||
.toString())));
|
||||
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
executorService.execute(producer);
|
||||
}
|
||||
|
||||
Thread.sleep(500);
|
||||
|
||||
ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute = eventWindowSort.getEventsFromLastMinute();
|
||||
|
||||
long eventsOlderThanOneMinute = eventsFromLastMinute
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(e -> e
|
||||
.getKey()
|
||||
.isBefore(ZonedDateTime
|
||||
.now()
|
||||
.minusMinutes(1)))
|
||||
.count();
|
||||
assertEquals(eventsOlderThanOneMinute, 0);
|
||||
|
||||
long eventYoungerThanOneMinute = eventsFromLastMinute
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(e -> e
|
||||
.getKey()
|
||||
.isAfter(ZonedDateTime
|
||||
.now()
|
||||
.minusMinutes(1)))
|
||||
.count();
|
||||
|
||||
//then
|
||||
assertTrue(eventYoungerThanOneMinute > 0);
|
||||
|
||||
executorService.awaitTermination(1, TimeUnit.SECONDS);
|
||||
executorService.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenThreadsProducingEvents_whenGetForEventsOlderThanOneMinute_thenReturnThoseEventsInTheLockFreeWay() throws InterruptedException {
|
||||
//given
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(3);
|
||||
EventWindowSort eventWindowSort = new EventWindowSort();
|
||||
int numberOfThreads = 2;
|
||||
//when
|
||||
Runnable producer = () -> IntStream
|
||||
.rangeClosed(0, 100)
|
||||
.forEach(index -> eventWindowSort.acceptEvent(new Event(ZonedDateTime
|
||||
.now()
|
||||
.minusSeconds(index), UUID
|
||||
.randomUUID()
|
||||
.toString())));
|
||||
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
executorService.execute(producer);
|
||||
}
|
||||
|
||||
Thread.sleep(500);
|
||||
|
||||
ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute = eventWindowSort.getEventsOlderThatOneMinute();
|
||||
|
||||
long eventsOlderThanOneMinute = eventsFromLastMinute
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(e -> e
|
||||
.getKey()
|
||||
.isBefore(ZonedDateTime
|
||||
.now()
|
||||
.minusMinutes(1)))
|
||||
.count();
|
||||
assertTrue(eventsOlderThanOneMinute > 0);
|
||||
|
||||
long eventYoungerThanOneMinute = eventsFromLastMinute
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(e -> e
|
||||
.getKey()
|
||||
.isAfter(ZonedDateTime
|
||||
.now()
|
||||
.minusMinutes(1)))
|
||||
.count();
|
||||
|
||||
//then
|
||||
assertEquals(eventYoungerThanOneMinute, 0);
|
||||
|
||||
executorService.awaitTermination(1, TimeUnit.SECONDS);
|
||||
executorService.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.baeldung.concurrent.synchronize;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class BaeldungSychronizedBlockTest {
|
||||
|
||||
@Test
|
||||
public void givenMultiThread_whenBlockSync() throws InterruptedException {
|
||||
ExecutorService service = Executors.newFixedThreadPool(3);
|
||||
BaeldungSynchronizedBlocks synchronizedBlocks = new BaeldungSynchronizedBlocks();
|
||||
|
||||
IntStream.range(0, 1000)
|
||||
.forEach(count -> service.submit(synchronizedBlocks::performSynchronisedTask));
|
||||
service.awaitTermination(100, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertEquals(1000, synchronizedBlocks.getCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMultiThread_whenStaticSyncBlock() throws InterruptedException {
|
||||
ExecutorService service = Executors.newCachedThreadPool();
|
||||
|
||||
IntStream.range(0, 1000)
|
||||
.forEach(count -> service.submit(BaeldungSynchronizedBlocks::performStaticSyncTask));
|
||||
service.awaitTermination(100, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertEquals(1000, BaeldungSynchronizedBlocks.getStaticCount());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package com.baeldung.concurrent.synchronize;
|
||||
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class BaeldungSynchronizeMethodsTest {
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void givenMultiThread_whenNonSyncMethod() throws InterruptedException {
|
||||
ExecutorService service = Executors.newFixedThreadPool(3);
|
||||
BaeldungSynchronizedMethods method = new BaeldungSynchronizedMethods();
|
||||
|
||||
IntStream.range(0, 1000)
|
||||
.forEach(count -> service.submit(method::calculate));
|
||||
service.awaitTermination(100, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertEquals(1000, method.getSum());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMultiThread_whenMethodSync() throws InterruptedException {
|
||||
ExecutorService service = Executors.newFixedThreadPool(3);
|
||||
BaeldungSynchronizedMethods method = new BaeldungSynchronizedMethods();
|
||||
|
||||
IntStream.range(0, 1000)
|
||||
.forEach(count -> service.submit(method::synchronisedCalculate));
|
||||
service.awaitTermination(100, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertEquals(1000, method.getSyncSum());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMultiThread_whenStaticSyncMethod() throws InterruptedException {
|
||||
ExecutorService service = Executors.newCachedThreadPool();
|
||||
|
||||
IntStream.range(0, 1000)
|
||||
.forEach(count -> service.submit(BaeldungSynchronizedMethods::syncStaticCalculate));
|
||||
service.awaitTermination(100, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertEquals(1000, BaeldungSynchronizedMethods.staticSum);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,71 @@
|
||||
package com.baeldung.concurrent.volatilekeyword;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static junit.framework.Assert.assertEquals;
|
||||
|
||||
public class SharedObjectManualTest {
|
||||
|
||||
private SharedObject sharedObject;
|
||||
private int valueReadByThread2;
|
||||
private int valueReadByThread3;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
sharedObject = new SharedObject();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenOneThreadWrites_thenVolatileReadsFromMainMemory() throws InterruptedException {
|
||||
Thread writer = new Thread(() -> sharedObject.increamentCount());
|
||||
writer.start();
|
||||
|
||||
|
||||
Thread readerOne = new Thread(() -> {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
valueReadByThread2 = sharedObject.getCount();
|
||||
});
|
||||
readerOne.start();
|
||||
|
||||
Thread readerTwo = new Thread(() -> {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
valueReadByThread3 = sharedObject.getCount();
|
||||
});
|
||||
readerTwo.start();
|
||||
|
||||
assertEquals(1, valueReadByThread2);
|
||||
assertEquals(1, valueReadByThread3);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenTwoThreadWrites_thenVolatileReadsFromMainMemory() throws InterruptedException {
|
||||
Thread writerOne = new Thread(() -> sharedObject.increamentCount());
|
||||
writerOne.start();
|
||||
Thread.sleep(100);
|
||||
|
||||
Thread writerTwo = new Thread(() -> sharedObject.increamentCount());
|
||||
writerTwo.start();
|
||||
Thread.sleep(100);
|
||||
|
||||
Thread readerOne = new Thread(() -> valueReadByThread2 = sharedObject.getCount());
|
||||
readerOne.start();
|
||||
|
||||
Thread readerTwo = new Thread(() -> valueReadByThread3 = sharedObject.getCount());
|
||||
readerTwo.start();
|
||||
|
||||
assertEquals(2, valueReadByThread2);
|
||||
assertEquals(2, valueReadByThread3);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
package com.baeldung.java.concurrentmap;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
||||
public class ConcurrentMapAggregateStatusManualTest {
|
||||
|
||||
private ExecutorService executorService;
|
||||
private Map<String, Integer> concurrentMap;
|
||||
private List<Integer> mapSizes;
|
||||
private int MAX_SIZE = 100000;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
executorService = Executors.newFixedThreadPool(2);
|
||||
concurrentMap = new ConcurrentHashMap<>();
|
||||
mapSizes = new ArrayList<>(MAX_SIZE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenConcurrentMap_whenSizeWithoutConcurrentUpdates_thenCorrect() throws InterruptedException {
|
||||
Runnable collectMapSizes = () -> {
|
||||
for (int i = 0; i < MAX_SIZE; i++) {
|
||||
concurrentMap.put(String.valueOf(i), i);
|
||||
mapSizes.add(concurrentMap.size());
|
||||
}
|
||||
};
|
||||
Runnable retrieveMapData = () -> {
|
||||
for (int i = 0; i < MAX_SIZE; i++) {
|
||||
concurrentMap.get(String.valueOf(i));
|
||||
}
|
||||
};
|
||||
executorService.execute(retrieveMapData);
|
||||
executorService.execute(collectMapSizes);
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(1, TimeUnit.MINUTES);
|
||||
|
||||
for (int i = 1; i <= MAX_SIZE; i++) {
|
||||
assertEquals("map size should be consistently reliable", i, mapSizes.get(i - 1)
|
||||
.intValue());
|
||||
}
|
||||
assertEquals(MAX_SIZE, concurrentMap.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenConcurrentMap_whenUpdatingAndGetSize_thenError() throws InterruptedException {
|
||||
Runnable collectMapSizes = () -> {
|
||||
for (int i = 0; i < MAX_SIZE; i++) {
|
||||
mapSizes.add(concurrentMap.size());
|
||||
}
|
||||
};
|
||||
Runnable updateMapData = () -> {
|
||||
for (int i = 0; i < MAX_SIZE; i++) {
|
||||
concurrentMap.put(String.valueOf(i), i);
|
||||
}
|
||||
};
|
||||
executorService.execute(updateMapData);
|
||||
executorService.execute(collectMapSizes);
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(1, TimeUnit.MINUTES);
|
||||
|
||||
assertNotEquals("map size collected with concurrent updates not reliable", MAX_SIZE, mapSizes.get(MAX_SIZE - 1)
|
||||
.intValue());
|
||||
assertEquals(MAX_SIZE, concurrentMap.size());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,160 @@
|
||||
package com.baeldung.java.concurrentmap;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
public class ConcurrentMapNullKeyValueManualTest {
|
||||
|
||||
ConcurrentMap<String, Object> concurrentMap;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
concurrentMap = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMap_whenGetWithNullKey_thenThrowsNPE() {
|
||||
concurrentMap.get(null);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMap_whenGetOrDefaultWithNullKey_thenThrowsNPE() {
|
||||
concurrentMap.getOrDefault(null, new Object());
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMap_whenPutWithNullKey_thenThrowsNPE() {
|
||||
concurrentMap.put(null, new Object());
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMap_whenPutNullValue_thenThrowsNPE() {
|
||||
concurrentMap.put("test", null);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMapAndKeyAbsent_whenPutWithNullKey_thenThrowsNPE() {
|
||||
concurrentMap.putIfAbsent(null, new Object());
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMapAndMapWithNullKey_whenPutNullKeyMap_thenThrowsNPE() {
|
||||
Map<String, Object> nullKeyMap = new HashMap<>();
|
||||
nullKeyMap.put(null, new Object());
|
||||
concurrentMap.putAll(nullKeyMap);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMapAndMapWithNullValue_whenPutNullValueMap_thenThrowsNPE() {
|
||||
Map<String, Object> nullValueMap = new HashMap<>();
|
||||
nullValueMap.put("test", null);
|
||||
concurrentMap.putAll(nullValueMap);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMap_whenReplaceNullKeyWithValues_thenThrowsNPE() {
|
||||
concurrentMap.replace(null, new Object(), new Object());
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMap_whenReplaceWithNullNewValue_thenThrowsNPE() {
|
||||
Object o = new Object();
|
||||
concurrentMap.replace("test", o, null);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMap_whenReplaceOldNullValue_thenThrowsNPE() {
|
||||
Object o = new Object();
|
||||
concurrentMap.replace("test", null, o);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMap_whenReplaceWithNullValue_thenThrowsNPE() {
|
||||
concurrentMap.replace("test", null);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMap_whenReplaceNullKey_thenThrowsNPE() {
|
||||
concurrentMap.replace(null, "test");
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMap_whenReplaceAllMappingNull_thenThrowsNPE() {
|
||||
concurrentMap.put("test", new Object());
|
||||
concurrentMap.replaceAll((s, o) -> null);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMap_whenRemoveNullKey_thenThrowsNPE() {
|
||||
concurrentMap.remove(null);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMap_whenRemoveNullKeyWithValue_thenThrowsNPE() {
|
||||
concurrentMap.remove(null, new Object());
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMap_whenMergeNullKeyWithValue_thenThrowsNPE() {
|
||||
concurrentMap.merge(null, new Object(), (o, o2) -> o2);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMap_whenMergeKeyWithNullValue_thenThrowsNPE() {
|
||||
concurrentMap.put("test", new Object());
|
||||
concurrentMap.merge("test", null, (o, o2) -> o2);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMapAndAssumeKeyAbsent_whenComputeWithNullKey_thenThrowsNPE() {
|
||||
concurrentMap.computeIfAbsent(null, s -> s);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMapAndAssumeKeyPresent_whenComputeWithNullKey_thenThrowsNPE() {
|
||||
concurrentMap.computeIfPresent(null, (s, o) -> o);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void givenConcurrentHashMap_whenComputeWithNullKey_thenThrowsNPE() {
|
||||
concurrentMap.compute(null, (s, o) -> o);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenConcurrentHashMap_whenMergeKeyRemappingNull_thenRemovesMapping() {
|
||||
Object oldValue = new Object();
|
||||
concurrentMap.put("test", oldValue);
|
||||
concurrentMap.merge("test", new Object(), (o, o2) -> null);
|
||||
assertNull(concurrentMap.get("test"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenConcurrentHashMapAndKeyAbsent_whenComputeWithKeyRemappingNull_thenRemainsAbsent() {
|
||||
concurrentMap.computeIfPresent("test", (s, o) -> null);
|
||||
assertNull(concurrentMap.get("test"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenKeyPresent_whenComputeIfPresentRemappingNull_thenMappingRemoved() {
|
||||
Object oldValue = new Object();
|
||||
concurrentMap.put("test", oldValue);
|
||||
concurrentMap.computeIfPresent("test", (s, o) -> null);
|
||||
assertNull(concurrentMap.get("test"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenKeyPresent_whenComputeRemappingNull_thenMappingRemoved() {
|
||||
Object oldValue = new Object();
|
||||
concurrentMap.put("test", oldValue);
|
||||
concurrentMap.compute("test", (s, o) -> null);
|
||||
assertNull(concurrentMap.get("test"));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
package com.baeldung.java.concurrentmap;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Hashtable;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class ConcurrentMapPerformanceManualTest {
|
||||
|
||||
@Test
|
||||
public void givenMaps_whenGetPut500KTimes_thenConcurrentMapFaster() throws Exception {
|
||||
final Map<String, Object> hashtable = new Hashtable<>();
|
||||
final Map<String, Object> synchronizedHashMap = Collections.synchronizedMap(new HashMap<>());
|
||||
final Map<String, Object> concurrentHashMap = new ConcurrentHashMap<>();
|
||||
|
||||
final long hashtableAvgRuntime = timeElapseForGetPut(hashtable);
|
||||
final long syncHashMapAvgRuntime = timeElapseForGetPut(synchronizedHashMap);
|
||||
final long concurrentHashMapAvgRuntime = timeElapseForGetPut(concurrentHashMap);
|
||||
|
||||
System.out.println(String.format("Hashtable: %s, syncHashMap: %s, ConcurrentHashMap: %s", hashtableAvgRuntime, syncHashMapAvgRuntime, concurrentHashMapAvgRuntime));
|
||||
|
||||
assertTrue(hashtableAvgRuntime > concurrentHashMapAvgRuntime);
|
||||
assertTrue(syncHashMapAvgRuntime > concurrentHashMapAvgRuntime);
|
||||
|
||||
}
|
||||
|
||||
private long timeElapseForGetPut(Map<String, Object> map) throws InterruptedException {
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(4);
|
||||
final long startTime = System.nanoTime();
|
||||
for (int i = 0; i < 4; i++) {
|
||||
executorService.execute(() -> {
|
||||
for (int j = 0; j < 500_000; j++) {
|
||||
final int value = ThreadLocalRandom.current().nextInt(10000);
|
||||
final String key = String.valueOf(value);
|
||||
map.put(key, value);
|
||||
map.get(key);
|
||||
}
|
||||
});
|
||||
}
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(1, TimeUnit.MINUTES);
|
||||
return (System.nanoTime() - startTime) / 500_000;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenConcurrentMap_whenKeyWithSameHashCode_thenPerformanceDegrades() throws InterruptedException {
|
||||
class SameHash {
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
final int executeTimes = 5000;
|
||||
|
||||
final Map<SameHash, Integer> mapOfSameHash = new ConcurrentHashMap<>();
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(2);
|
||||
final long sameHashStartTime = System.currentTimeMillis();
|
||||
for (int i = 0; i < 2; i++) {
|
||||
executorService.execute(() -> {
|
||||
for (int j = 0; j < executeTimes; j++) {
|
||||
mapOfSameHash.put(new SameHash(), 1);
|
||||
}
|
||||
});
|
||||
}
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(5, TimeUnit.SECONDS);
|
||||
|
||||
final long mapOfSameHashDuration = System.currentTimeMillis() - sameHashStartTime;
|
||||
final Map<Object, Integer> mapOfDefaultHash = new ConcurrentHashMap<>();
|
||||
executorService = Executors.newFixedThreadPool(2);
|
||||
final long defaultHashStartTime = System.currentTimeMillis();
|
||||
for (int i = 0; i < 2; i++) {
|
||||
executorService.execute(() -> {
|
||||
for (int j = 0; j < executeTimes; j++) {
|
||||
mapOfDefaultHash.put(new Object(), 1);
|
||||
}
|
||||
});
|
||||
}
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(5, TimeUnit.SECONDS);
|
||||
|
||||
final long mapOfDefaultHashDuration = System.currentTimeMillis() - defaultHashStartTime;
|
||||
assertEquals(executeTimes * 2, mapOfDefaultHash.size());
|
||||
assertEquals(executeTimes * 2, mapOfSameHash.size());
|
||||
System.out.println(String.format("same-hash: %s, default-hash: %s", mapOfSameHashDuration, mapOfDefaultHashDuration));
|
||||
assertTrue("same hashCode() should greatly degrade performance", mapOfSameHashDuration > (mapOfDefaultHashDuration * 10));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,79 @@
|
||||
package com.baeldung.java.concurrentmap;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class ConcurrentNavigableMapManualTest {
|
||||
|
||||
@Test
|
||||
public void givenSkipListMap_whenAccessInMultiThreads_thenOrderingStable() throws InterruptedException {
|
||||
NavigableMap<Integer, String> skipListMap = new ConcurrentSkipListMap<>();
|
||||
|
||||
updateMapConcurrently(skipListMap, 4);
|
||||
|
||||
Iterator<Integer> skipListIter = skipListMap.keySet().iterator();
|
||||
int previous = skipListIter.next();
|
||||
while (skipListIter.hasNext()) {
|
||||
int current = skipListIter.next();
|
||||
assertTrue(previous < current);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateMapConcurrently(NavigableMap<Integer, String> navigableMap, int concurrencyLevel) throws InterruptedException {
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(concurrencyLevel);
|
||||
for (int i = 0; i < concurrencyLevel; i++) {
|
||||
executorService.execute(() -> {
|
||||
ThreadLocalRandom random = ThreadLocalRandom.current();
|
||||
for (int j = 0; j < 10000; j++) {
|
||||
navigableMap.put(random.nextInt(), "test");
|
||||
}
|
||||
});
|
||||
}
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(1, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenSkipListMap_whenNavConcurrently_thenCountCorrect() throws InterruptedException {
|
||||
NavigableMap<Integer, Integer> skipListMap = new ConcurrentSkipListMap<>();
|
||||
int count = countMapElementByPollingFirstEntry(skipListMap, 10000, 4);
|
||||
assertEquals(10000 * 4, count);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenTreeMap_whenNavConcurrently_thenCountError() throws InterruptedException {
|
||||
NavigableMap<Integer, Integer> treeMap = new TreeMap<>();
|
||||
int count = countMapElementByPollingFirstEntry(treeMap, 10000, 4);
|
||||
assertNotEquals(10000 * 4, count);
|
||||
}
|
||||
|
||||
private int countMapElementByPollingFirstEntry(NavigableMap<Integer, Integer> navigableMap, int elementCount, int concurrencyLevel) throws InterruptedException {
|
||||
for (int i = 0; i < elementCount * concurrencyLevel; i++) {
|
||||
navigableMap.put(i, i);
|
||||
}
|
||||
AtomicInteger counter = new AtomicInteger(0);
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(concurrencyLevel);
|
||||
for (int j = 0; j < concurrencyLevel; j++) {
|
||||
executorService.execute(() -> {
|
||||
for (int i = 0; i < elementCount; i++) {
|
||||
if (navigableMap.pollFirstEntry() != null) {
|
||||
counter.incrementAndGet();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(1, TimeUnit.MINUTES);
|
||||
return counter.get();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,72 @@
|
||||
package com.baeldung.java.concurrentmap;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class ConcurretMapMemoryConsistencyManualTest {
|
||||
|
||||
@Test
|
||||
public void givenConcurrentMap_whenSumParallel_thenCorrect() throws Exception {
|
||||
Map<String, Integer> map = new ConcurrentHashMap<>();
|
||||
List<Integer> sumList = parallelSum100(map, 1000);
|
||||
assertEquals(1, sumList.stream()
|
||||
.distinct()
|
||||
.count());
|
||||
long wrongResultCount = sumList.stream()
|
||||
.filter(num -> num != 100)
|
||||
.count();
|
||||
assertEquals(0, wrongResultCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenHashtable_whenSumParallel_thenCorrect() throws Exception {
|
||||
Map<String, Integer> map = new Hashtable<>();
|
||||
List<Integer> sumList = parallelSum100(map, 1000);
|
||||
assertEquals(1, sumList.stream()
|
||||
.distinct()
|
||||
.count());
|
||||
long wrongResultCount = sumList.stream()
|
||||
.filter(num -> num != 100)
|
||||
.count();
|
||||
assertEquals(0, wrongResultCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenHashMap_whenSumParallel_thenError() throws Exception {
|
||||
Map<String, Integer> map = new HashMap<>();
|
||||
List<Integer> sumList = parallelSum100(map, 100);
|
||||
assertNotEquals(1, sumList.stream()
|
||||
.distinct()
|
||||
.count());
|
||||
long wrongResultCount = sumList.stream()
|
||||
.filter(num -> num != 100)
|
||||
.count();
|
||||
assertTrue(wrongResultCount > 0);
|
||||
}
|
||||
|
||||
private List<Integer> parallelSum100(Map<String, Integer> map, int executionTimes) throws InterruptedException {
|
||||
List<Integer> sumList = new ArrayList<>(1000);
|
||||
for (int i = 0; i < executionTimes; i++) {
|
||||
map.put("test", 0);
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(4);
|
||||
for (int j = 0; j < 10; j++) {
|
||||
executorService.execute(() -> {
|
||||
for (int k = 0; k < 10; k++)
|
||||
map.computeIfPresent("test", (key, value) -> value + 1);
|
||||
});
|
||||
}
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(5, TimeUnit.SECONDS);
|
||||
sumList.add(map.get("test"));
|
||||
}
|
||||
return sumList;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
package com.baeldung.java.concurrentmodification;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.util.Lists.newArrayList;
|
||||
|
||||
public class ConcurrentModificationUnitTest {
|
||||
@Test(expected = ConcurrentModificationException.class)
|
||||
public void givenIterating_whenRemoving_thenThrowException() throws InterruptedException {
|
||||
|
||||
List<Integer> integers = newArrayList(1, 2, 3);
|
||||
|
||||
for (Integer integer : integers) {
|
||||
integers.remove(1);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenIterating_whenUsingIteratorRemove_thenNoError() throws InterruptedException {
|
||||
|
||||
List<Integer> integers = newArrayList(1, 2, 3);
|
||||
|
||||
for (Iterator<Integer> iterator = integers.iterator(); iterator.hasNext();) {
|
||||
Integer integer = iterator.next();
|
||||
if(integer == 2) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
|
||||
assertThat(integers).containsExactly(1, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenIterating_whenUsingRemovalList_thenNoError() throws InterruptedException {
|
||||
|
||||
List<Integer> integers = newArrayList(1, 2, 3);
|
||||
List<Integer> toRemove = newArrayList();
|
||||
|
||||
for (Integer integer : integers) {
|
||||
if(integer == 2) {
|
||||
toRemove.add(integer);
|
||||
}
|
||||
}
|
||||
integers.removeAll(toRemove);
|
||||
|
||||
assertThat(integers).containsExactly(1, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUsingRemoveIf_thenRemoveElements() throws InterruptedException {
|
||||
|
||||
Collection<Integer> integers = newArrayList(1, 2, 3);
|
||||
|
||||
integers.removeIf(i -> i == 2);
|
||||
|
||||
assertThat(integers).containsExactly(1, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUsingStream_thenRemoveElements() {
|
||||
Collection<Integer> integers = newArrayList(1, 2, 3);
|
||||
|
||||
List<String> collected = integers
|
||||
.stream()
|
||||
.filter(i -> i != 2)
|
||||
.map(Object::toString)
|
||||
.collect(toList());
|
||||
|
||||
assertThat(collected).containsExactly("1", "3");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,162 @@
|
||||
package com.baeldung.java8;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class Java8ExecutorServiceIntegrationTest {
|
||||
|
||||
private Runnable runnableTask;
|
||||
private Callable<String> callableTask;
|
||||
private List<Callable<String>> callableTasks;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
|
||||
runnableTask = () -> {
|
||||
try {
|
||||
TimeUnit.MILLISECONDS.sleep(300);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
};
|
||||
|
||||
callableTask = () -> {
|
||||
TimeUnit.MILLISECONDS.sleep(300);
|
||||
return "Task's execution";
|
||||
};
|
||||
|
||||
callableTasks = new ArrayList<>();
|
||||
callableTasks.add(callableTask);
|
||||
callableTasks.add(callableTask);
|
||||
callableTasks.add(callableTask);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void creationSubmittingTaskShuttingDown_whenShutDown_thenCorrect() {
|
||||
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(10);
|
||||
executorService.submit(runnableTask);
|
||||
executorService.submit(callableTask);
|
||||
executorService.shutdown();
|
||||
|
||||
assertTrue(executorService.isShutdown());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void creationSubmittingTasksShuttingDownNow_whenShutDownAfterAwating_thenCorrect() {
|
||||
|
||||
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
threadPoolExecutor.submit(callableTask);
|
||||
}
|
||||
|
||||
List<Runnable> notExecutedTasks = smartShutdown(threadPoolExecutor);
|
||||
|
||||
assertTrue(threadPoolExecutor.isShutdown());
|
||||
assertFalse(notExecutedTasks.isEmpty());
|
||||
assertTrue(notExecutedTasks.size() > 0 && notExecutedTasks.size() < 98);
|
||||
}
|
||||
|
||||
private List<Runnable> smartShutdown(ExecutorService executorService) {
|
||||
|
||||
List<Runnable> notExecutedTasks = new ArrayList<>();
|
||||
executorService.shutdown();
|
||||
try {
|
||||
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
|
||||
notExecutedTasks = executorService.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
notExecutedTasks = executorService.shutdownNow();
|
||||
}
|
||||
return notExecutedTasks;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void submittingTasks_whenExecutedOneAndAll_thenCorrect() {
|
||||
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(10);
|
||||
|
||||
String result = null;
|
||||
List<Future<String>> futures = new ArrayList<>();
|
||||
try {
|
||||
result = executorService.invokeAny(callableTasks);
|
||||
futures = executorService.invokeAll(callableTasks);
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
assertEquals("Task's execution", result);
|
||||
assertTrue(futures.size() == 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void submittingTaskShuttingDown_whenGetExpectedResult_thenCorrect() {
|
||||
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(10);
|
||||
|
||||
Future<String> future = executorService.submit(callableTask);
|
||||
String result = null;
|
||||
try {
|
||||
result = future.get();
|
||||
result = future.get(200, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
executorService.shutdown();
|
||||
|
||||
assertEquals("Task's execution", result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void submittingTask_whenCanceled_thenCorrect() {
|
||||
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(10);
|
||||
|
||||
Future<String> future = executorService.submit(callableTask);
|
||||
|
||||
boolean canceled = future.cancel(true);
|
||||
boolean isCancelled = future.isCancelled();
|
||||
|
||||
executorService.shutdown();
|
||||
|
||||
assertTrue(canceled);
|
||||
assertTrue(isCancelled);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void submittingTaskScheduling_whenExecuted_thenCorrect() {
|
||||
|
||||
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
Future<String> resultFuture = executorService.schedule(callableTask, 1, TimeUnit.SECONDS);
|
||||
String result = null;
|
||||
try {
|
||||
result = resultFuture.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
executorService.shutdown();
|
||||
|
||||
assertEquals("Task's execution", result);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,88 @@
|
||||
package com.baeldung.synchronousqueue;
|
||||
|
||||
import org.junit.FixMethodOrder;
|
||||
import org.junit.Test;
|
||||
import org.junit.runners.MethodSorters;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||
public class SynchronousQueueIntegrationTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SynchronousQueueIntegrationTest.class);
|
||||
|
||||
|
||||
@Test
|
||||
public void givenTwoThreads_whenWantToExchangeUsingLockGuardedVariable_thenItSucceed() throws InterruptedException {
|
||||
//given
|
||||
ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||
AtomicInteger sharedState = new AtomicInteger();
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
Runnable producer = () -> {
|
||||
Integer producedElement = ThreadLocalRandom.current().nextInt();
|
||||
LOG.debug("Saving an element: " + producedElement + " to the exchange point");
|
||||
sharedState.set(producedElement);
|
||||
countDownLatch.countDown();
|
||||
};
|
||||
|
||||
Runnable consumer = () -> {
|
||||
try {
|
||||
countDownLatch.await();
|
||||
Integer consumedElement = sharedState.get();
|
||||
LOG.debug("consumed an element: " + consumedElement + " from the exchange point");
|
||||
} catch (InterruptedException ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
};
|
||||
|
||||
//when
|
||||
executor.execute(producer);
|
||||
executor.execute(consumer);
|
||||
|
||||
//then
|
||||
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
|
||||
executor.shutdown();
|
||||
assertEquals(countDownLatch.getCount(), 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenTwoThreads_whenWantToExchangeUsingSynchronousQueue_thenItSucceed() throws InterruptedException {
|
||||
//given
|
||||
ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||
final SynchronousQueue<Integer> queue = new SynchronousQueue<>();
|
||||
|
||||
Runnable producer = () -> {
|
||||
Integer producedElement = ThreadLocalRandom.current().nextInt();
|
||||
try {
|
||||
LOG.debug("Saving an element: " + producedElement + " to the exchange point");
|
||||
queue.put(producedElement);
|
||||
} catch (InterruptedException ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
};
|
||||
|
||||
Runnable consumer = () -> {
|
||||
try {
|
||||
Integer consumedElement = queue.take();
|
||||
LOG.debug("consumed an element: " + consumedElement + " from the exchange point");
|
||||
} catch (InterruptedException ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
};
|
||||
|
||||
//when
|
||||
executor.execute(producer);
|
||||
executor.execute(consumer);
|
||||
|
||||
//then
|
||||
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
|
||||
executor.shutdown();
|
||||
assertEquals(queue.size(), 0);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.baeldung.threadlocal;
|
||||
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class ThreadLocalIntegrationTest {
|
||||
@Test
|
||||
public void givenThreadThatStoresContextInAMap_whenStartThread_thenShouldSetContextForBothUsers() throws ExecutionException, InterruptedException {
|
||||
//when
|
||||
SharedMapWithUserContext firstUser = new SharedMapWithUserContext(1);
|
||||
SharedMapWithUserContext secondUser = new SharedMapWithUserContext(2);
|
||||
new Thread(firstUser).start();
|
||||
new Thread(secondUser).start();
|
||||
|
||||
Thread.sleep(3000);
|
||||
//then
|
||||
assertEquals(SharedMapWithUserContext.userContextPerUserId.size(), 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenThreadThatStoresContextInThreadLocal_whenStartThread_thenShouldStoreContextInThreadLocal() throws ExecutionException, InterruptedException {
|
||||
//when
|
||||
ThreadLocalWithUserContext firstUser = new ThreadLocalWithUserContext(1);
|
||||
ThreadLocalWithUserContext secondUser = new ThreadLocalWithUserContext(2);
|
||||
new Thread(firstUser).start();
|
||||
new Thread(secondUser).start();
|
||||
|
||||
Thread.sleep(3000);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,147 @@
|
||||
package com.baeldung.threadpool;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class CoreThreadPoolIntegrationTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(CoreThreadPoolIntegrationTest.class);
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void whenCallingExecuteWithRunnable_thenRunnableIsExecuted() throws InterruptedException {
|
||||
|
||||
CountDownLatch lock = new CountDownLatch(1);
|
||||
|
||||
Executor executor = Executors.newSingleThreadExecutor();
|
||||
executor.execute(() -> {
|
||||
LOG.debug("Hello World");
|
||||
lock.countDown();
|
||||
});
|
||||
|
||||
lock.await(1000, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUsingExecutorServiceAndFuture_thenCanWaitOnFutureResult() throws InterruptedException, ExecutionException {
|
||||
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(10);
|
||||
Future<String> future = executorService.submit(() -> "Hello World");
|
||||
String result = future.get();
|
||||
|
||||
assertEquals("Hello World", result);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUsingFixedThreadPool_thenCoreAndMaximumThreadSizeAreTheSame() {
|
||||
|
||||
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
|
||||
executor.submit(() -> {
|
||||
Thread.sleep(1000);
|
||||
return null;
|
||||
});
|
||||
executor.submit(() -> {
|
||||
Thread.sleep(1000);
|
||||
return null;
|
||||
});
|
||||
executor.submit(() -> {
|
||||
Thread.sleep(1000);
|
||||
return null;
|
||||
});
|
||||
|
||||
assertEquals(2, executor.getPoolSize());
|
||||
assertEquals(1, executor.getQueue().size());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUsingCachedThreadPool_thenPoolSizeGrowsUnbounded() {
|
||||
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
|
||||
executor.submit(() -> {
|
||||
Thread.sleep(1000);
|
||||
return null;
|
||||
});
|
||||
executor.submit(() -> {
|
||||
Thread.sleep(1000);
|
||||
return null;
|
||||
});
|
||||
executor.submit(() -> {
|
||||
Thread.sleep(1000);
|
||||
return null;
|
||||
});
|
||||
|
||||
assertEquals(3, executor.getPoolSize());
|
||||
assertEquals(0, executor.getQueue().size());
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void whenUsingSingleThreadPool_thenTasksExecuteSequentially() throws InterruptedException {
|
||||
|
||||
CountDownLatch lock = new CountDownLatch(2);
|
||||
AtomicInteger counter = new AtomicInteger();
|
||||
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
executor.submit(() -> {
|
||||
counter.set(1);
|
||||
lock.countDown();
|
||||
});
|
||||
executor.submit(() -> {
|
||||
counter.compareAndSet(1, 2);
|
||||
lock.countDown();
|
||||
});
|
||||
|
||||
lock.await(1000, TimeUnit.MILLISECONDS);
|
||||
assertEquals(2, counter.get());
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void whenSchedulingTask_thenTaskExecutesWithinGivenPeriod() throws InterruptedException {
|
||||
|
||||
CountDownLatch lock = new CountDownLatch(1);
|
||||
|
||||
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
|
||||
executor.schedule(() -> {
|
||||
LOG.debug("Hello World");
|
||||
lock.countDown();
|
||||
}, 500, TimeUnit.MILLISECONDS);
|
||||
|
||||
lock.await(1000, TimeUnit.MILLISECONDS);
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void whenSchedulingTaskWithFixedPeriod_thenTaskExecutesMultipleTimes() throws InterruptedException {
|
||||
|
||||
CountDownLatch lock = new CountDownLatch(3);
|
||||
|
||||
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
|
||||
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
|
||||
LOG.debug("Hello World");
|
||||
lock.countDown();
|
||||
}, 500, 100, TimeUnit.MILLISECONDS);
|
||||
|
||||
lock.await();
|
||||
future.cancel(true);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUsingForkJoinPool_thenSumOfTreeElementsIsCalculatedCorrectly() {
|
||||
|
||||
TreeNode tree = new TreeNode(5, new TreeNode(3), new TreeNode(2, new TreeNode(2), new TreeNode(8)));
|
||||
|
||||
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
|
||||
int sum = forkJoinPool.invoke(new CountingTask(tree));
|
||||
|
||||
assertEquals(20, sum);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
package com.baeldung.threadpool;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class GuavaThreadPoolIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void whenExecutingTaskWithDirectExecutor_thenTheTaskIsExecutedInTheCurrentThread() {
|
||||
|
||||
Executor executor = MoreExecutors.directExecutor();
|
||||
|
||||
AtomicBoolean executed = new AtomicBoolean();
|
||||
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
executed.set(true);
|
||||
});
|
||||
|
||||
assertTrue(executed.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenJoiningFuturesWithAllAsList_thenCombinedFutureCompletesAfterAllFuturesComplete() throws ExecutionException, InterruptedException {
|
||||
|
||||
ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService);
|
||||
|
||||
ListenableFuture<String> future1 = listeningExecutorService.submit(() -> "Hello");
|
||||
ListenableFuture<String> future2 = listeningExecutorService.submit(() -> "World");
|
||||
|
||||
String greeting = Futures.allAsList(future1, future2).get().stream().collect(Collectors.joining(" "));
|
||||
assertEquals("Hello World", greeting);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
package com.baeldung.transferqueue;
|
||||
|
||||
import org.junit.FixMethodOrder;
|
||||
import org.junit.Test;
|
||||
import org.junit.runners.MethodSorters;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||
public class TransferQueueIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void whenMultipleConsumersAndProducers_thenProcessAllMessages() throws InterruptedException {
|
||||
//given
|
||||
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
|
||||
ExecutorService exService = Executors.newFixedThreadPool(3);
|
||||
Producer producer1 = new Producer(transferQueue, "1", 3);
|
||||
Producer producer2 = new Producer(transferQueue, "2", 3);
|
||||
Consumer consumer1 = new Consumer(transferQueue, "1", 3);
|
||||
Consumer consumer2 = new Consumer(transferQueue, "2", 3);
|
||||
|
||||
//when
|
||||
exService.execute(producer1);
|
||||
exService.execute(producer2);
|
||||
exService.execute(consumer1);
|
||||
exService.execute(consumer2);
|
||||
|
||||
//then
|
||||
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
|
||||
exService.shutdown();
|
||||
|
||||
assertEquals(producer1.numberOfProducedMessages.intValue(), 3);
|
||||
assertEquals(producer2.numberOfProducedMessages.intValue(), 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUseOneConsumerAndOneProducer_thenShouldProcessAllMessages() throws InterruptedException {
|
||||
//given
|
||||
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
|
||||
ExecutorService exService = Executors.newFixedThreadPool(2);
|
||||
Producer producer = new Producer(transferQueue, "1", 3);
|
||||
Consumer consumer = new Consumer(transferQueue, "1", 3);
|
||||
|
||||
//when
|
||||
exService.execute(producer);
|
||||
exService.execute(consumer);
|
||||
|
||||
//then
|
||||
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
|
||||
exService.shutdown();
|
||||
|
||||
assertEquals(producer.numberOfProducedMessages.intValue(), 3);
|
||||
assertEquals(consumer.numberOfConsumedMessages.intValue(), 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUseOneProducerAndNoConsumers_thenShouldFailWithTimeout() throws InterruptedException {
|
||||
//given
|
||||
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
|
||||
ExecutorService exService = Executors.newFixedThreadPool(2);
|
||||
Producer producer = new Producer(transferQueue, "1", 3);
|
||||
|
||||
//when
|
||||
exService.execute(producer);
|
||||
|
||||
//then
|
||||
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
|
||||
exService.shutdown();
|
||||
|
||||
assertEquals(producer.numberOfProducedMessages.intValue(), 0);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package org.baeldung.java.streams;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.LongStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class ThreadPoolInParallelStreamIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal() throws InterruptedException, ExecutionException {
|
||||
long firstNum = 1;
|
||||
long lastNum = 1_000_000;
|
||||
|
||||
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed().collect(Collectors.toList());
|
||||
|
||||
ForkJoinPool customThreadPool = new ForkJoinPool(4);
|
||||
long actualTotal = customThreadPool.submit(() -> aList.parallelStream().reduce(0L, Long::sum)).get();
|
||||
|
||||
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenList_whenCallingParallelStream_shouldBeParallelStream() {
|
||||
List<Long> aList = new ArrayList<>();
|
||||
Stream<Long> parallelStream = aList.parallelStream();
|
||||
|
||||
assertTrue(parallelStream.isParallel());
|
||||
}
|
||||
}
|
||||
13
core-java-concurrency/src/test/resources/.gitignore
vendored
Normal file
13
core-java-concurrency/src/test/resources/.gitignore
vendored
Normal file
@@ -0,0 +1,13 @@
|
||||
*.class
|
||||
|
||||
#folders#
|
||||
/target
|
||||
/neoDb*
|
||||
/data
|
||||
/src/main/webapp/WEB-INF/classes
|
||||
*/META-INF/*
|
||||
|
||||
# Packaged files #
|
||||
*.jar
|
||||
*.war
|
||||
*.ear
|
||||
Reference in New Issue
Block a user