JAVA-12045 : move java concurrency ebook content code to common module (#12559)

* JAVA-12045 : move java concurrency ebook content code to common module

* JAVA-12045: addressed the review comments .. renamed package names and placed core-java-concurrency-simple module in pom.xml

* JAVA-12045: removed duplicate module after renaming java-simple-module to core-java-simple-module
This commit is contained in:
Keerthi
2022-08-08 23:26:55 +10:00
committed by GitHub
parent 861764512d
commit 0f57319d9c
36 changed files with 45 additions and 8 deletions

View File

@@ -3,8 +3,6 @@
This module contains articles about basic Java concurrency
### Relevant Articles:
- [Guide To CompletableFuture](https://www.baeldung.com/java-completablefuture)
- [A Guide to the Java ExecutorService](https://www.baeldung.com/java-executor-service-tutorial)
- [Guide to java.util.concurrent.Future](https://www.baeldung.com/java-future)
- [Overview of the java.util.concurrent](https://www.baeldung.com/java-util-concurrent)
- [Implementing a Runnable vs Extending a Thread](https://www.baeldung.com/java-runnable-vs-extending-thread)
@@ -12,5 +10,4 @@ This module contains articles about basic Java concurrency
- [ExecutorService Waiting for Threads to Finish](https://www.baeldung.com/java-executor-wait-for-threads)
- [Runnable vs. Callable in Java](https://www.baeldung.com/java-runnable-callable)
- [What is Thread-Safety and How to Achieve it?](https://www.baeldung.com/java-thread-safety)
- [How to Start a Thread in Java](https://www.baeldung.com/java-start-thread)
- [[Next -->]](/core-java-modules/core-java-concurrency-basic-2)

View File

@@ -1,52 +0,0 @@
package com.baeldung.concurrent.Scheduledexecutorservice;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public class ScheduledExecutorServiceDemo {
private void execute() {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
getTasksToRun().apply(executorService);
executorService.shutdown();
}
private void executeWithMultiThread() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
getTasksToRun().apply(executorService);
executorService.shutdown();
}
private Function<ScheduledExecutorService, Void> getTasksToRun() {
return (executorService -> {
ScheduledFuture<?> scheduledFuture1 = executorService.schedule(() -> {
// Task
}, 1, TimeUnit.SECONDS);
ScheduledFuture<?> scheduledFuture2 = executorService.scheduleAtFixedRate(() -> {
// Task
}, 1, 10, TimeUnit.SECONDS);
ScheduledFuture<?> scheduledFuture3 = executorService.scheduleWithFixedDelay(() -> {
// Task
}, 1, 10, TimeUnit.SECONDS);
ScheduledFuture<String> scheduledFuture4 = executorService.schedule(() -> {
// Task
return "Hellow world";
}, 1, TimeUnit.SECONDS);
return null;
});
}
public static void main(String... args) {
ScheduledExecutorServiceDemo demo = new ScheduledExecutorServiceDemo();
demo.execute();
demo.executeWithMultiThread();
}
}

View File

@@ -1,39 +0,0 @@
package com.baeldung.concurrent.executorservice;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
public class DelayedCallable implements Callable<String> {
private String name;
private long period;
private CountDownLatch latch;
public DelayedCallable(String name, long period, CountDownLatch latch) {
this(name, period);
this.latch = latch;
}
public DelayedCallable(String name, long period) {
this.name = name;
this.period = period;
}
public String call() {
try {
Thread.sleep(period);
if (latch != null) {
latch.countDown();
}
} catch (InterruptedException ex) {
// handle exception
ex.printStackTrace();
Thread.currentThread().interrupt();
}
return name;
}
}

View File

@@ -1,27 +0,0 @@
package com.baeldung.concurrent.executorservice;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceDemo {
ExecutorService executor = Executors.newFixedThreadPool(10);
public void execute() {
executor.submit(() -> {
new Task();
});
executor.shutdown();
executor.shutdownNow();
try {
executor.awaitTermination(20l, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@@ -1,10 +0,0 @@
package com.baeldung.concurrent.executorservice;
public class Task implements Runnable {
@Override
public void run() {
// task details
}
}

View File

@@ -1,208 +0,0 @@
package com.baeldung.completablefuture;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class CompletableFutureLongRunningUnitTest {
private static final Logger LOG = LoggerFactory.getLogger(CompletableFutureLongRunningUnitTest.class);
@Test
public void whenRunningCompletableFutureAsynchronously_thenGetMethodWaitsForResult() throws InterruptedException, ExecutionException {
Future<String> completableFuture = calculateAsync();
String result = completableFuture.get();
assertEquals("Hello", result);
}
private Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool()
.submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
@Test
public void whenRunningCompletableFutureWithResult_thenGetMethodReturnsImmediately() throws InterruptedException, ExecutionException {
Future<String> completableFuture = CompletableFuture.completedFuture("Hello");
String result = completableFuture.get();
assertEquals("Hello", result);
}
private Future<String> calculateAsyncWithCancellation() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool()
.submit(() -> {
Thread.sleep(500);
completableFuture.cancel(false);
return null;
});
return completableFuture;
}
@Test(expected = CancellationException.class)
public void whenCancelingTheFuture_thenThrowsCancellationException() throws ExecutionException, InterruptedException {
Future<String> future = calculateAsyncWithCancellation();
future.get();
}
@Test
public void whenCreatingCompletableFutureWithSupplyAsync_thenFutureReturnsValue() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
assertEquals("Hello", future.get());
}
@Test
public void whenAddingThenAcceptToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture.thenAccept(s -> LOG.debug("Computation returned: " + s));
future.get();
}
@Test
public void whenAddingThenRunToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture.thenRun(() -> LOG.debug("Computation finished."));
future.get();
}
@Test
public void whenAddingThenApplyToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture.thenApply(s -> s + " World");
assertEquals("Hello World", future.get());
}
@Test
public void whenUsingThenCompose_thenFuturesExecuteSequentially() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
assertEquals("Hello World", completableFuture.get());
}
@Test
public void whenUsingThenCombine_thenWaitForExecutionOfBothFutures() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2);
assertEquals("Hello World", completableFuture.get());
}
@Test
public void whenUsingThenAcceptBoth_thenWaitForExecutionOfBothFutures() throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> LOG.debug(s1 + s2));
}
@Test
public void whenFutureCombinedWithAllOfCompletes_thenAllFuturesAreDone() throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);
// ...
combinedFuture.get();
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);
}
@Test
public void whenFutureThrows_thenHandleMethodReceivesException() throws ExecutionException, InterruptedException {
String name = null;
// ...
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
})
.handle((s, t) -> s != null ? s : "Hello, Stranger!");
assertEquals("Hello, Stranger!", completableFuture.get());
}
@Test(expected = ExecutionException.class)
public void whenCompletingFutureExceptionally_thenGetMethodThrows() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// ...
completableFuture.completeExceptionally(new RuntimeException("Calculation failed!"));
// ...
completableFuture.get();
}
@Test
public void whenAddingThenApplyAsyncToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture.thenApplyAsync(s -> s + " World");
assertEquals("Hello World", future.get());
}
@Test
public void whenPassingTransformation_thenFunctionExecutionWithThenApply() throws InterruptedException, ExecutionException {
CompletableFuture<Integer> finalResult = compute().thenApply(s -> s + 1);
assertTrue(finalResult.get() == 11);
}
@Test
public void whenPassingPreviousStage_thenFunctionExecutionWithThenCompose() throws InterruptedException, ExecutionException {
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
assertTrue(finalResult.get() == 20);
}
public CompletableFuture<Integer> compute(){
return CompletableFuture.supplyAsync(() -> 10);
}
public CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
}

View File

@@ -1,162 +0,0 @@
package com.baeldung.concurrent.executorservice;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Before;
import org.junit.Test;
public class Java8ExecutorServiceIntegrationTest {
private Runnable runnableTask;
private Callable<String> callableTask;
private List<Callable<String>> callableTasks;
@Before
public void init() {
runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};
callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);
}
@Test
public void creationSubmittingTaskShuttingDown_whenShutDown_thenCorrect() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.submit(runnableTask);
executorService.submit(callableTask);
executorService.shutdown();
assertTrue(executorService.isShutdown());
}
@Test
public void creationSubmittingTasksShuttingDownNow_whenShutDownAfterAwating_thenCorrect() {
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
for (int i = 0; i < 100; i++) {
threadPoolExecutor.submit(callableTask);
}
List<Runnable> notExecutedTasks = smartShutdown(threadPoolExecutor);
assertTrue(threadPoolExecutor.isShutdown());
assertFalse(notExecutedTasks.isEmpty());
assertTrue(notExecutedTasks.size() < 98);
}
private List<Runnable> smartShutdown(ExecutorService executorService) {
List<Runnable> notExecutedTasks = new ArrayList<>();
executorService.shutdown();
try {
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
notExecutedTasks = executorService.shutdownNow();
}
} catch (InterruptedException e) {
notExecutedTasks = executorService.shutdownNow();
}
return notExecutedTasks;
}
@Test
public void submittingTasks_whenExecutedOneAndAll_thenCorrect() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
String result = null;
List<Future<String>> futures = new ArrayList<>();
try {
result = executorService.invokeAny(callableTasks);
futures = executorService.invokeAll(callableTasks);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
assertEquals("Task's execution", result);
assertTrue(futures.size() == 3);
}
@Test
public void submittingTaskShuttingDown_whenGetExpectedResult_thenCorrect() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(callableTask);
String result = null;
try {
result = future.get();
result = future.get(200, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
executorService.shutdown();
assertEquals("Task's execution", result);
}
@Test
public void submittingTask_whenCanceled_thenCorrect() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(callableTask);
boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();
executorService.shutdown();
assertTrue(canceled);
assertTrue(isCancelled);
}
@Test
public void submittingTaskScheduling_whenExecuted_thenCorrect() {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
Future<String> resultFuture = executorService.schedule(callableTask, 1, TimeUnit.SECONDS);
String result = null;
try {
result = resultFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executorService.shutdown();
assertEquals("Task's execution", result);
}
}

View File

@@ -1,145 +0,0 @@
package com.baeldung.concurrent.executorservice;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import static junit.framework.TestCase.assertTrue;
public class WaitingForThreadsToFinishManualTest {
private static final Logger LOG = LoggerFactory.getLogger(WaitingForThreadsToFinishManualTest.class);
private final static ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);
public void awaitTerminationAfterShutdown(ExecutorService threadPool) {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
}
} catch (InterruptedException ex) {
threadPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
@Test
public void givenMultipleThreads_whenUsingCountDownLatch_thenMainShoudWaitForAllToFinish() {
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);
try {
long startTime = System.currentTimeMillis();
// create a CountDownLatch that waits for the 2 threads to finish
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
WORKER_THREAD_POOL.submit(() -> {
try {
Thread.sleep(1000);
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
});
}
// wait for the latch to be decremented by the two threads
latch.await();
long processingTime = System.currentTimeMillis() - startTime;
assertTrue(processingTime >= 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
awaitTerminationAfterShutdown(WORKER_THREAD_POOL);
}
@Test
public void givenMultipleThreads_whenInvokeAll_thenMainThreadShouldWaitForAllToFinish() {
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);
List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000));
try {
long startProcessingTime = System.currentTimeMillis();
List<Future<String>> futures = WORKER_THREAD_POOL.invokeAll(callables);
awaitTerminationAfterShutdown(WORKER_THREAD_POOL);
try {
WORKER_THREAD_POOL.submit((Callable<String>) () -> {
Thread.sleep(1000000);
return null;
});
} catch (RejectedExecutionException ex) {
//
}
long totalProcessingTime = System.currentTimeMillis() - startProcessingTime;
assertTrue(totalProcessingTime >= 3000);
String firstThreadResponse = futures.get(0)
.get();
assertTrue("First response should be from the fast thread", "fast thread".equals(firstThreadResponse));
String secondThreadResponse = futures.get(1)
.get();
assertTrue("Last response should be from the slow thread", "slow thread".equals(secondThreadResponse));
} catch (ExecutionException | InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void givenMultipleThreads_whenUsingCompletionService_thenMainThreadShouldWaitForAllToFinish() {
CompletionService<String> service = new ExecutorCompletionService<>(WORKER_THREAD_POOL);
List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000));
for (Callable<String> callable : callables) {
service.submit(callable);
}
try {
long startProcessingTime = System.currentTimeMillis();
Future<String> future = service.take();
String firstThreadResponse = future.get();
long totalProcessingTime = System.currentTimeMillis() - startProcessingTime;
assertTrue("First response should be from the fast thread", "fast thread".equals(firstThreadResponse));
assertTrue(totalProcessingTime >= 100 && totalProcessingTime < 1000);
LOG.debug("Thread finished after: " + totalProcessingTime + " milliseconds");
future = service.take();
String secondThreadResponse = future.get();
totalProcessingTime = System.currentTimeMillis() - startProcessingTime;
assertTrue("Last response should be from the slow thread", "slow thread".equals(secondThreadResponse));
assertTrue(totalProcessingTime >= 3000 && totalProcessingTime < 4000);
LOG.debug("Thread finished after: " + totalProcessingTime + " milliseconds");
} catch (ExecutionException | InterruptedException ex) {
ex.printStackTrace();
} finally {
awaitTerminationAfterShutdown(WORKER_THREAD_POOL);
}
}
}