[BAEL-10837] - Splitted core-java-concurrency module

This commit is contained in:
amit2103
2018-12-25 21:54:46 +05:30
parent 8fbed2e741
commit 45ba2a9387
127 changed files with 256 additions and 136 deletions

26
core-java-concurrency-basic/.gitignore vendored Normal file
View File

@@ -0,0 +1,26 @@
*.class
0.*
#folders#
/target
/neoDb*
/data
/src/main/webapp/WEB-INF/classes
*/META-INF/*
.resourceCache
# Packaged files #
*.jar
*.war
*.ear
# Files generated by integration tests
*.txt
backup-pom.xml
/bin/
/temp
#IntelliJ specific
.idea/
*.iml

View File

@@ -0,0 +1,17 @@
=========
## Core Java Concurrency Basic Examples
### Relevant Articles:
- [Guide To CompletableFuture](http://www.baeldung.com/java-completablefuture)
- [A Guide to the Java ExecutorService](http://www.baeldung.com/java-executor-service-tutorial)
- [Guide to java.util.concurrent.Future](http://www.baeldung.com/java-future)
- [Difference Between Wait and Sleep in Java](http://www.baeldung.com/java-wait-and-sleep)
- [Guide to Synchronized Keyword in Java](http://www.baeldung.com/java-synchronized)
- [Overview of the java.util.concurrent](http://www.baeldung.com/java-util-concurrent)
- [Implementing a Runnable vs Extending a Thread](http://www.baeldung.com/java-runnable-vs-extending-thread)
- [How to Kill a Java Thread](http://www.baeldung.com/java-thread-stop)
- [ExecutorService - Waiting for Threads to Finish](http://www.baeldung.com/java-executor-wait-for-threads)
- [wait and notify() Methods in Java](http://www.baeldung.com/java-wait-notify)
- [Life Cycle of a Thread in Java](http://www.baeldung.com/java-thread-lifecycle)
- [Runnable vs. Callable in Java](http://www.baeldung.com/java-runnable-callable)

View File

@@ -0,0 +1,55 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.baeldung</groupId>
<artifactId>core-java-concurrency-basic</artifactId>
<version>0.1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>core-java-concurrency-basic</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-java</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../parent-java</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${avaitility.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>core-java-concurrency-basic</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
</build>
<properties>
<!-- util -->
<commons-lang3.version>3.5</commons-lang3.version>
<!-- testing -->
<assertj.version>3.6.1</assertj.version>
<avaitility.version>1.7.0</avaitility.version>
</properties>
</project>

View File

@@ -0,0 +1,52 @@
package com.baeldung.concurrent.Scheduledexecutorservice;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public class ScheduledExecutorServiceDemo {
private void execute() {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
getTasksToRun().apply(executorService);
executorService.shutdown();
}
private void executeWithMultiThread() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
getTasksToRun().apply(executorService);
executorService.shutdown();
}
private Function<ScheduledExecutorService, Void> getTasksToRun() {
return (executorService -> {
ScheduledFuture<?> scheduledFuture1 = executorService.schedule(() -> {
// Task
}, 1, TimeUnit.SECONDS);
ScheduledFuture<?> scheduledFuture2 = executorService.scheduleAtFixedRate(() -> {
// Task
}, 1, 10, TimeUnit.SECONDS);
ScheduledFuture<?> scheduledFuture3 = executorService.scheduleWithFixedDelay(() -> {
// Task
}, 1, 10, TimeUnit.SECONDS);
ScheduledFuture<String> scheduledFuture4 = executorService.schedule(() -> {
// Task
return "Hellow world";
}, 1, TimeUnit.SECONDS);
return null;
});
}
public static void main(String... args) {
ScheduledExecutorServiceDemo demo = new ScheduledExecutorServiceDemo();
demo.execute();
demo.executeWithMultiThread();
}
}

View File

@@ -0,0 +1,30 @@
package com.baeldung.concurrent.callable;
import java.util.concurrent.Callable;
public class FactorialTask implements Callable<Integer> {
int number;
public FactorialTask(int number) {
this.number = number;
}
public Integer call() throws InvalidParamaterException {
int fact=1;
if(number < 0)
throw new InvalidParamaterException("Number must be positive");
for(int count=number;count>1;count--){
fact=fact * count;
}
return fact;
}
private class InvalidParamaterException extends Exception {
public InvalidParamaterException(String message) {
super(message);
}
}
}

View File

@@ -0,0 +1,24 @@
package com.baeldung.concurrent.cyclicbarrier;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public void start() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// Task
System.out.println("All previous tasks are completed");
});
Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
Thread t3 = new Thread(new Task(cyclicBarrier), "T3");
if (!cyclicBarrier.isBroken()) {
t1.start();
t2.start();
t3.start();
}
}
}

View File

@@ -0,0 +1,24 @@
package com.baeldung.concurrent.cyclicbarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Task implements Runnable {
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println("Thread : " + Thread.currentThread().getName() + " is waiting");
barrier.await();
System.out.println("Thread : " + Thread.currentThread().getName() + " is released");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}

View File

@@ -0,0 +1,14 @@
package com.baeldung.concurrent.executor;
import java.util.concurrent.Executor;
public class ExecutorDemo {
public void execute() {
Executor executor = new Invoker();
executor.execute(()->{
// task to be performed
});
}
}

View File

@@ -0,0 +1,12 @@
package com.baeldung.concurrent.executor;
import java.util.concurrent.Executor;
public class Invoker implements Executor {
@Override
public void execute(Runnable r) {
r.run();
}
}

View File

@@ -0,0 +1,39 @@
package com.baeldung.concurrent.executorservice;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
public class DelayedCallable implements Callable<String> {
private String name;
private long period;
private CountDownLatch latch;
public DelayedCallable(String name, long period, CountDownLatch latch) {
this(name, period);
this.latch = latch;
}
public DelayedCallable(String name, long period) {
this.name = name;
this.period = period;
}
public String call() {
try {
Thread.sleep(period);
if (latch != null) {
latch.countDown();
}
} catch (InterruptedException ex) {
// handle exception
ex.printStackTrace();
Thread.currentThread().interrupt();
}
return name;
}
}

View File

@@ -0,0 +1,27 @@
package com.baeldung.concurrent.executorservice;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceDemo {
ExecutorService executor = Executors.newFixedThreadPool(10);
public void execute() {
executor.submit(() -> {
new Task();
});
executor.shutdown();
executor.shutdownNow();
try {
executor.awaitTermination(20l, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@@ -0,0 +1,10 @@
package com.baeldung.concurrent.executorservice;
public class Task implements Runnable {
@Override
public void run() {
// task details
}
}

View File

@@ -0,0 +1,26 @@
package com.baeldung.concurrent.future;
import java.util.concurrent.RecursiveTask;
public class FactorialSquareCalculator extends RecursiveTask<Integer> {
private static final long serialVersionUID = 1L;
final private Integer n;
FactorialSquareCalculator(Integer n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
FactorialSquareCalculator calculator = new FactorialSquareCalculator(n - 1);
calculator.fork();
return n * n + calculator.join();
}
}

View File

@@ -0,0 +1,44 @@
package com.baeldung.concurrent.future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureDemo {
public String invoke() {
String str = null;
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> {
// Task
Thread.sleep(10000l);
return "Hellow world";
});
future.cancel(false);
try {
future.get(20, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e1) {
e1.printStackTrace();
}
if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return str;
}
}

View File

@@ -0,0 +1,20 @@
package com.baeldung.concurrent.future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
class SquareCalculator {
private final ExecutorService executor;
SquareCalculator(ExecutorService executor) {
this.executor = executor;
}
Future<Integer> calculate(Integer input) {
return executor.submit(() -> {
Thread.sleep(1000);
return input * input;
});
}
}

View File

@@ -0,0 +1,17 @@
package com.baeldung.concurrent.runnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EventLoggingTask implements Runnable{
private Logger logger = LoggerFactory.getLogger(EventLoggingTask.class);
@Override
public void run() {
String messge="Message read from the event queue";
logger.info("Message read from event queue is "+messge);
}
}

View File

@@ -0,0 +1,25 @@
package com.baeldung.concurrent.runnable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TaskRunner {
private static ExecutorService executorService;
public static void main(String[] args) {
executeTask();
}
private static void executeTask() {
executorService= Executors.newSingleThreadExecutor();
EventLoggingTask task = new EventLoggingTask();
Future future = executorService.submit(task);
executorService.shutdown();
}
}

View File

@@ -0,0 +1,22 @@
package com.baeldung.concurrent.semaphore;
import java.util.concurrent.Semaphore;
public class SemaPhoreDemo {
static Semaphore semaphore = new Semaphore(10);
public void execute() throws InterruptedException {
System.out.println("Available permit : " + semaphore.availablePermits());
System.out.println("Number of threads waiting to acquire: " + semaphore.getQueueLength());
if (semaphore.tryAcquire()) {
semaphore.acquire();
// perform some critical operations
semaphore.release();
}
}
}

View File

@@ -0,0 +1,27 @@
package com.baeldung.concurrent.sleepwait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/***
* Example of waking up a waiting thread
*/
public class ThreadA {
private static final Logger LOG = LoggerFactory.getLogger(ThreadA.class);
private static final ThreadB b = new ThreadB();
public static void main(String... args) throws InterruptedException {
b.start();
synchronized (b) {
while (b.sum == 0) {
LOG.debug("Waiting for ThreadB to complete...");
b.wait();
}
LOG.debug("ThreadB has completed. Sum from that thread is: " + b.sum);
}
}
}

View File

@@ -0,0 +1,20 @@
package com.baeldung.concurrent.sleepwait;
/***
* Example of waking up a waiting thread
*/
class ThreadB extends Thread {
int sum;
@Override
public void run() {
synchronized (this) {
int i = 0;
while (i < 100000) {
sum += i;
i++;
}
notify();
}
}
}

View File

@@ -0,0 +1,29 @@
package com.baeldung.concurrent.sleepwait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/***
* Example of wait() and sleep() methods
*/
public class WaitSleepExample {
private static final Logger LOG = LoggerFactory.getLogger(WaitSleepExample.class);
private static final Object LOCK = new Object();
public static void main(String... args) throws InterruptedException {
sleepWaitInSynchronizedBlocks();
}
private static void sleepWaitInSynchronizedBlocks() throws InterruptedException {
Thread.sleep(1000); // called on the thread
LOG.debug("Thread '" + Thread.currentThread().getName() + "' is woken after sleeping for 1 second");
synchronized (LOCK) {
LOCK.wait(1000); // called on the object, synchronization required
LOG.debug("Object '" + LOCK + "' is woken after waiting for 1 second");
}
}
}

View File

@@ -0,0 +1,53 @@
package com.baeldung.concurrent.stopping;
import java.util.concurrent.atomic.AtomicBoolean;
public class ControlSubThread implements Runnable {
private Thread worker;
private int interval = 100;
private AtomicBoolean running = new AtomicBoolean(false);
private AtomicBoolean stopped = new AtomicBoolean(true);
public ControlSubThread(int sleepInterval) {
interval = sleepInterval;
}
public void start() {
worker = new Thread(this);
worker.start();
}
public void stop() {
running.set(false);
}
public void interrupt() {
running.set(false);
worker.interrupt();
}
boolean isRunning() {
return running.get();
}
boolean isStopped() {
return stopped.get();
}
public void run() {
running.set(true);
stopped.set(false);
while (running.get()) {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Thread was interrupted, Failed to complete operation");
}
// do something
}
stopped.set(true);
}
}

View File

@@ -0,0 +1,35 @@
package com.baeldung.concurrent.synchronize;
public class BaeldungSynchronizedBlocks {
private int count = 0;
private static int staticCount = 0;
void performSynchronisedTask() {
synchronized (this) {
setCount(getCount() + 1);
}
}
static void performStaticSyncTask() {
synchronized (BaeldungSynchronizedBlocks.class) {
setStaticCount(getStaticCount() + 1);
}
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
static int getStaticCount() {
return staticCount;
}
private static void setStaticCount(int staticCount) {
BaeldungSynchronizedBlocks.staticCount = staticCount;
}
}

View File

@@ -0,0 +1,37 @@
package com.baeldung.concurrent.synchronize;
public class BaeldungSynchronizedMethods {
private int sum = 0;
private int syncSum = 0;
static int staticSum = 0;
void calculate() {
setSum(getSum() + 1);
}
synchronized void synchronisedCalculate() {
setSyncSum(getSyncSum() + 1);
}
static synchronized void syncStaticCalculate() {
staticSum = staticSum + 1;
}
public int getSum() {
return sum;
}
public void setSum(int sum) {
this.sum = sum;
}
int getSyncSum() {
return syncSum;
}
private void setSyncSum(int syncSum) {
this.syncSum = syncSum;
}
}

View File

@@ -0,0 +1,23 @@
package com.baeldung.concurrent.threadfactory;
import java.util.concurrent.ThreadFactory;
public class BaeldungThreadFactory implements ThreadFactory {
private int threadId;
private String name;
public BaeldungThreadFactory(String name) {
threadId = 1;
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-Thread_" + threadId);
System.out.println("created new thread with id : " + threadId + " and name : " + t.getName());
threadId++;
return t;
}
}

View File

@@ -0,0 +1,13 @@
package com.baeldung.concurrent.threadfactory;
public class Demo {
public void execute() {
BaeldungThreadFactory factory = new BaeldungThreadFactory("BaeldungThreadFactory");
for (int i = 0; i < 10; i++) {
Thread t = factory.newThread(new Task());
t.start();
}
}
}

View File

@@ -0,0 +1,10 @@
package com.baeldung.concurrent.threadfactory;
public class Task implements Runnable {
@Override
public void run() {
// task details
}
}

View File

@@ -0,0 +1,31 @@
package com.baeldung.concurrent.threadlifecycle;
public class BlockedState {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new DemoThreadB());
Thread t2 = new Thread(new DemoThreadB());
t1.start();
t2.start();
Thread.sleep(1000);
System.out.println(t2.getState());
System.exit(0);
}
}
class DemoThreadB implements Runnable {
@Override
public void run() {
commonResource();
}
public static synchronized void commonResource() {
while(true) {
// Infinite loop to mimic heavy processing
// Thread 't1' won't leave this method
// when Thread 't2' enters this
}
}
}

View File

@@ -0,0 +1,14 @@
package com.baeldung.concurrent.threadlifecycle;
public class NewState implements Runnable {
public static void main(String[] args) {
Runnable runnable = new NewState();
Thread t = new Thread(runnable);
System.out.println(t.getState());
}
@Override
public void run() {
}
}

View File

@@ -0,0 +1,15 @@
package com.baeldung.concurrent.threadlifecycle;
public class RunnableState implements Runnable {
public static void main(String[] args) {
Runnable runnable = new NewState();
Thread t = new Thread(runnable);
t.start();
System.out.println(t.getState());
}
@Override
public void run() {
}
}

View File

@@ -0,0 +1,15 @@
package com.baeldung.concurrent.threadlifecycle;
public class TerminatedState implements Runnable {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new TerminatedState());
t1.start();
Thread.sleep(1000);
System.out.println(t1.getState());
}
@Override
public void run() {
// No processing in this block
}
}

View File

@@ -0,0 +1,25 @@
package com.baeldung.concurrent.threadlifecycle;
public class TimedWaitingState {
public static void main(String[] args) throws InterruptedException {
DemoThread obj1 = new DemoThread();
Thread t1 = new Thread(obj1);
t1.start();
// The following sleep will give enough time for ThreadScheduler
// to start processing of thread t1
Thread.sleep(1000);
System.out.println(t1.getState());
}
}
class DemoThread implements Runnable {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}

View File

@@ -0,0 +1,35 @@
package com.baeldung.concurrent.threadlifecycle;
public class WaitingState implements Runnable {
public static Thread t1;
public static void main(String[] args) {
t1 = new Thread(new WaitingState());
t1.start();
}
public void run() {
Thread t2 = new Thread(new DemoThreadWS());
t2.start();
try {
t2.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}
class DemoThreadWS implements Runnable {
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
System.out.println(WaitingState.t1.getState());
}
}

View File

@@ -0,0 +1,39 @@
package com.baeldung.concurrent.waitandnotify;
public class Data {
private String packet;
// True if receiver should wait
// False if sender should wait
private boolean transfer = true;
public synchronized String receive() {
while (transfer) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Thread Interrupted");
}
}
transfer = true;
notifyAll();
return packet;
}
public synchronized void send(String packet) {
while (!transfer) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Thread Interrupted");
}
}
transfer = false;
this.packet = packet;
notifyAll();
}
}

View File

@@ -0,0 +1,12 @@
package com.baeldung.concurrent.waitandnotify;
public class NetworkDriver {
public static void main(String[] args) {
Data data = new Data();
Thread sender = new Thread(new Sender(data));
Thread receiver = new Thread(new Receiver(data));
sender.start();
receiver.start();
}
}

View File

@@ -0,0 +1,28 @@
package com.baeldung.concurrent.waitandnotify;
import java.util.concurrent.ThreadLocalRandom;
public class Receiver implements Runnable {
private Data load;
public Receiver(Data load) {
this.load = load;
}
public void run() {
for(String receivedMessage = load.receive();
!"End".equals(receivedMessage) ;
receivedMessage = load.receive()) {
System.out.println(receivedMessage);
//Thread.sleep() to mimic heavy server-side processing
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Thread Interrupted");
}
}
}
}

View File

@@ -0,0 +1,33 @@
package com.baeldung.concurrent.waitandnotify;
import java.util.concurrent.ThreadLocalRandom;
public class Sender implements Runnable {
private Data data;
public Sender(Data data) {
this.data = data;
}
public void run() {
String packets[] = {
"First packet",
"Second packet",
"Third packet",
"Fourth packet",
"End"
};
for (String packet : packets) {
data.send(packet);
//Thread.sleep() to mimic heavy server-side processing
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Thread Interrupted");
}
}
}
}

View File

@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<logger name="org.springframework" level="WARN" />
<logger name="org.springframework.transaction" level="WARN" />
<!-- in order to debug some marshalling issues, this needs to be TRACE -->
<logger name="org.springframework.web.servlet.mvc" level="WARN" />
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@@ -0,0 +1,208 @@
package com.baeldung.completablefuture;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class CompletableFutureLongRunningUnitTest {
private static final Logger LOG = LoggerFactory.getLogger(CompletableFutureLongRunningUnitTest.class);
@Test
public void whenRunningCompletableFutureAsynchronously_thenGetMethodWaitsForResult() throws InterruptedException, ExecutionException {
Future<String> completableFuture = calculateAsync();
String result = completableFuture.get();
assertEquals("Hello", result);
}
private Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool()
.submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
@Test
public void whenRunningCompletableFutureWithResult_thenGetMethodReturnsImmediately() throws InterruptedException, ExecutionException {
Future<String> completableFuture = CompletableFuture.completedFuture("Hello");
String result = completableFuture.get();
assertEquals("Hello", result);
}
private Future<String> calculateAsyncWithCancellation() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool()
.submit(() -> {
Thread.sleep(500);
completableFuture.cancel(false);
return null;
});
return completableFuture;
}
@Test(expected = CancellationException.class)
public void whenCancelingTheFuture_thenThrowsCancellationException() throws ExecutionException, InterruptedException {
Future<String> future = calculateAsyncWithCancellation();
future.get();
}
@Test
public void whenCreatingCompletableFutureWithSupplyAsync_thenFutureReturnsValue() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
assertEquals("Hello", future.get());
}
@Test
public void whenAddingThenAcceptToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture.thenAccept(s -> LOG.debug("Computation returned: " + s));
future.get();
}
@Test
public void whenAddingThenRunToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture.thenRun(() -> LOG.debug("Computation finished."));
future.get();
}
@Test
public void whenAddingThenApplyToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture.thenApply(s -> s + " World");
assertEquals("Hello World", future.get());
}
@Test
public void whenUsingThenCompose_thenFuturesExecuteSequentially() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
assertEquals("Hello World", completableFuture.get());
}
@Test
public void whenUsingThenCombine_thenWaitForExecutionOfBothFutures() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2);
assertEquals("Hello World", completableFuture.get());
}
@Test
public void whenUsingThenAcceptBoth_thenWaitForExecutionOfBothFutures() throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> LOG.debug(s1 + s2));
}
@Test
public void whenFutureCombinedWithAllOfCompletes_thenAllFuturesAreDone() throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);
// ...
combinedFuture.get();
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);
}
@Test
public void whenFutureThrows_thenHandleMethodReceivesException() throws ExecutionException, InterruptedException {
String name = null;
// ...
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
})
.handle((s, t) -> s != null ? s : "Hello, Stranger!");
assertEquals("Hello, Stranger!", completableFuture.get());
}
@Test(expected = ExecutionException.class)
public void whenCompletingFutureExceptionally_thenGetMethodThrows() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// ...
completableFuture.completeExceptionally(new RuntimeException("Calculation failed!"));
// ...
completableFuture.get();
}
@Test
public void whenAddingThenApplyAsyncToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture.thenApplyAsync(s -> s + " World");
assertEquals("Hello World", future.get());
}
@Test
public void whenPassingTransformation_thenFunctionExecutionWithThenApply() throws InterruptedException, ExecutionException {
CompletableFuture<Integer> finalResult = compute().thenApply(s -> s + 1);
assertTrue(finalResult.get() == 11);
}
@Test
public void whenPassingPreviousStage_thenFunctionExecutionWithThenCompose() throws InterruptedException, ExecutionException {
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
assertTrue(finalResult.get() == 20);
}
public CompletableFuture<Integer> compute(){
return CompletableFuture.supplyAsync(() -> 10);
}
public CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
}

View File

@@ -0,0 +1,48 @@
package com.baeldung.concurrent.callable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static junit.framework.Assert.assertEquals;
public class FactorialTaskManualTest {
private ExecutorService executorService;
@Before
public void setup(){
executorService = Executors.newSingleThreadExecutor();
}
@Test
public void whenTaskSubmitted_ThenFutureResultObtained() throws ExecutionException, InterruptedException {
FactorialTask task = new FactorialTask(5);
Future<Integer> future= executorService.submit(task);
assertEquals(120,future.get().intValue());
}
@Test(expected = ExecutionException.class)
public void whenException_ThenCallableThrowsIt() throws ExecutionException, InterruptedException {
FactorialTask task = new FactorialTask(-5);
Future<Integer> future= executorService.submit(task);
Integer result=future.get().intValue();
}
@Test
public void whenException_ThenCallableDoesntThrowsItIfGetIsNotCalled(){
FactorialTask task = new FactorialTask(-5);
Future<Integer> future= executorService.submit(task);
assertEquals(false,future.isDone());
}
@After
public void cleanup(){
executorService.shutdown();
}
}

View File

@@ -0,0 +1,145 @@
package com.baeldung.concurrent.executorservice;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import static junit.framework.TestCase.assertTrue;
public class WaitingForThreadsToFinishManualTest {
private static final Logger LOG = LoggerFactory.getLogger(WaitingForThreadsToFinishManualTest.class);
private final static ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);
public void awaitTerminationAfterShutdown(ExecutorService threadPool) {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
}
} catch (InterruptedException ex) {
threadPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
@Test
public void givenMultipleThreads_whenUsingCountDownLatch_thenMainShoudWaitForAllToFinish() {
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);
try {
long startTime = System.currentTimeMillis();
// create a CountDownLatch that waits for the 2 threads to finish
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
WORKER_THREAD_POOL.submit(() -> {
try {
Thread.sleep(1000);
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
});
}
// wait for the latch to be decremented by the two threads
latch.await();
long processingTime = System.currentTimeMillis() - startTime;
assertTrue(processingTime >= 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
awaitTerminationAfterShutdown(WORKER_THREAD_POOL);
}
@Test
public void givenMultipleThreads_whenInvokeAll_thenMainThreadShouldWaitForAllToFinish() {
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);
List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000));
try {
long startProcessingTime = System.currentTimeMillis();
List<Future<String>> futures = WORKER_THREAD_POOL.invokeAll(callables);
awaitTerminationAfterShutdown(WORKER_THREAD_POOL);
try {
WORKER_THREAD_POOL.submit((Callable<String>) () -> {
Thread.sleep(1000000);
return null;
});
} catch (RejectedExecutionException ex) {
//
}
long totalProcessingTime = System.currentTimeMillis() - startProcessingTime;
assertTrue(totalProcessingTime >= 3000);
String firstThreadResponse = futures.get(0)
.get();
assertTrue("First response should be from the fast thread", "fast thread".equals(firstThreadResponse));
String secondThreadResponse = futures.get(1)
.get();
assertTrue("Last response should be from the slow thread", "slow thread".equals(secondThreadResponse));
} catch (ExecutionException | InterruptedException ex) {
ex.printStackTrace();
}
}
@Test
public void givenMultipleThreads_whenUsingCompletionService_thenMainThreadShouldWaitForAllToFinish() {
CompletionService<String> service = new ExecutorCompletionService<>(WORKER_THREAD_POOL);
List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000));
for (Callable<String> callable : callables) {
service.submit(callable);
}
try {
long startProcessingTime = System.currentTimeMillis();
Future<String> future = service.take();
String firstThreadResponse = future.get();
long totalProcessingTime = System.currentTimeMillis() - startProcessingTime;
assertTrue("First response should be from the fast thread", "fast thread".equals(firstThreadResponse));
assertTrue(totalProcessingTime >= 100 && totalProcessingTime < 1000);
LOG.debug("Thread finished after: " + totalProcessingTime + " milliseconds");
future = service.take();
String secondThreadResponse = future.get();
totalProcessingTime = System.currentTimeMillis() - startProcessingTime;
assertTrue("Last response should be from the slow thread", "slow thread".equals(secondThreadResponse));
assertTrue(totalProcessingTime >= 3000 && totalProcessingTime < 4000);
LOG.debug("Thread finished after: " + totalProcessingTime + " milliseconds");
} catch (ExecutionException | InterruptedException ex) {
ex.printStackTrace();
} finally {
awaitTerminationAfterShutdown(WORKER_THREAD_POOL);
}
}
}

View File

@@ -0,0 +1,22 @@
package com.baeldung.concurrent.future;
import org.junit.Test;
import java.util.concurrent.ForkJoinPool;
import static org.junit.Assert.assertEquals;
public class FactorialSquareCalculatorUnitTest {
@Test
public void whenCalculatesFactorialSquare_thenReturnCorrectValue() {
ForkJoinPool forkJoinPool = new ForkJoinPool();
FactorialSquareCalculator calculator = new FactorialSquareCalculator(10);
forkJoinPool.execute(calculator);
assertEquals("The sum of the squares from 1 to 10 is 385", 385, calculator.join().intValue());
}
}

View File

@@ -0,0 +1,98 @@
package com.baeldung.concurrent.future;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class SquareCalculatorIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(SquareCalculatorIntegrationTest.class);
@Rule
public TestName name = new TestName();
private long start;
private SquareCalculator squareCalculator;
@Test
public void givenExecutorIsSingleThreaded_whenTwoExecutionsAreTriggered_thenRunInSequence() throws InterruptedException, ExecutionException {
squareCalculator = new SquareCalculator(Executors.newSingleThreadExecutor());
Future<Integer> result1 = squareCalculator.calculate(4);
Future<Integer> result2 = squareCalculator.calculate(1000);
while (!result1.isDone() || !result2.isDone()) {
LOG.debug(String.format("Task 1 is %s and Task 2 is %s.", result1.isDone() ? "done" : "not done", result2.isDone() ? "done" : "not done"));
Thread.sleep(300);
}
assertEquals(16, result1.get().intValue());
assertEquals(1000000, result2.get().intValue());
}
@Test(expected = TimeoutException.class)
public void whenGetWithTimeoutLowerThanExecutionTime_thenThrowException() throws InterruptedException, ExecutionException, TimeoutException {
squareCalculator = new SquareCalculator(Executors.newSingleThreadExecutor());
Future<Integer> result = squareCalculator.calculate(4);
result.get(500, TimeUnit.MILLISECONDS);
}
@Test
public void givenExecutorIsMultiThreaded_whenTwoExecutionsAreTriggered_thenRunInParallel() throws InterruptedException, ExecutionException {
squareCalculator = new SquareCalculator(Executors.newFixedThreadPool(2));
Future<Integer> result1 = squareCalculator.calculate(4);
Future<Integer> result2 = squareCalculator.calculate(1000);
while (!result1.isDone() || !result2.isDone()) {
LOG.debug(String.format("Task 1 is %s and Task 2 is %s.", result1.isDone() ? "done" : "not done", result2.isDone() ? "done" : "not done"));
Thread.sleep(300);
}
assertEquals(16, result1.get().intValue());
assertEquals(1000000, result2.get().intValue());
}
@Test(expected = CancellationException.class)
public void whenCancelFutureAndCallGet_thenThrowException() throws InterruptedException, ExecutionException, TimeoutException {
squareCalculator = new SquareCalculator(Executors.newSingleThreadExecutor());
Future<Integer> result = squareCalculator.calculate(4);
boolean canceled = result.cancel(true);
assertTrue("Future was canceled", canceled);
assertTrue("Future was canceled", result.isCancelled());
result.get();
}
@Before
public void start() {
start = System.currentTimeMillis();
}
@After
public void end() {
LOG.debug(String.format("Test %s took %s ms \n", name.getMethodName(), System.currentTimeMillis() - start));
}
}

View File

@@ -0,0 +1,133 @@
package com.baeldung.concurrent.runnable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.lang3.RandomUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RunnableVsThreadLiveTest {
private static Logger log =
LoggerFactory.getLogger(RunnableVsThreadLiveTest.class);
private static ExecutorService executorService;
@BeforeClass
public static void setup() {
executorService = Executors.newCachedThreadPool();
}
@Test
public void givenARunnable_whenRunIt_thenResult() throws Exception{
Thread thread = new Thread(new SimpleRunnable(
"SimpleRunnable executed using Thread"));
thread.start();
thread.join();
}
@Test
public void givenARunnable_whenSubmitToES_thenResult() throws Exception{
executorService.submit(new SimpleRunnable(
"SimpleRunnable executed using ExecutorService")).get();
}
@Test
public void givenARunnableLambda_whenSubmitToES_thenResult()
throws Exception{
executorService.submit(()->
log.info("Lambda runnable executed!!!")).get();
}
@Test
public void givenAThread_whenRunIt_thenResult() throws Exception{
Thread thread = new SimpleThread(
"SimpleThread executed using Thread");
thread.start();
thread.join();
}
@Test
public void givenAThread_whenSubmitToES_thenResult() throws Exception{
executorService.submit(new SimpleThread(
"SimpleThread executed using ExecutorService")).get();
}
@Test
public void givenACallable_whenSubmitToES_thenResult() throws Exception {
Future<Integer> future = executorService.submit(
new SimpleCallable());
log.info("Result from callable: {}", future.get());
}
@Test
public void givenACallableAsLambda_whenSubmitToES_thenResult()
throws Exception {
Future<Integer> future = executorService.submit(() -> RandomUtils.nextInt(0, 100));
log.info("Result from callable: {}", future.get());
}
@AfterClass
public static void tearDown() {
if ( executorService != null && !executorService.isShutdown()) {
executorService.shutdown();
}
}
}
class SimpleThread extends Thread{
private static final Logger log =
LoggerFactory.getLogger(SimpleThread.class);
private String message;
SimpleThread(String message) {
this.message = message;
}
@Override
public void run() {
log.info(message);
}
}
class SimpleRunnable implements Runnable {
private static final Logger log =
LoggerFactory.getLogger(SimpleRunnable.class);
private String message;
SimpleRunnable(String message) {
this.message = message;
}
@Override
public void run() {
log.info(message);
}
}
class SimpleCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return RandomUtils.nextInt(0, 100);
}
}

View File

@@ -0,0 +1,56 @@
package com.baeldung.concurrent.stopping;
import static com.jayway.awaitility.Awaitility.await;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import com.jayway.awaitility.Awaitility;
public class StopThreadManualTest {
@Test
public void whenStoppedThreadIsStopped() throws InterruptedException {
int interval = 5;
ControlSubThread controlSubThread = new ControlSubThread(interval);
controlSubThread.start();
// Give things a chance to get set up
Thread.sleep(interval);
assertTrue(controlSubThread.isRunning());
assertFalse(controlSubThread.isStopped());
// Stop it and make sure the flags have been reversed
controlSubThread.stop();
await()
.until(() -> assertTrue(controlSubThread.isStopped()));
}
@Test
public void whenInterruptedThreadIsStopped() throws InterruptedException {
int interval = 50;
ControlSubThread controlSubThread = new ControlSubThread(interval);
controlSubThread.start();
// Give things a chance to get set up
Thread.sleep(interval);
assertTrue(controlSubThread.isRunning());
assertFalse(controlSubThread.isStopped());
// Stop it and make sure the flags have been reversed
controlSubThread.interrupt();
// Wait less than the time we would normally sleep, and make sure we exited.
Awaitility.await()
.pollDelay(2, TimeUnit.MILLISECONDS)
.atMost(interval/ 10, TimeUnit.MILLISECONDS)
.until(controlSubThread::isStopped);
}
}

View File

@@ -0,0 +1,37 @@
package com.baeldung.concurrent.synchronize;
import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
public class BaeldungSychronizedBlockUnitTest {
@Test
public void givenMultiThread_whenBlockSync() throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(3);
BaeldungSynchronizedBlocks synchronizedBlocks = new BaeldungSynchronizedBlocks();
IntStream.range(0, 1000)
.forEach(count -> service.submit(synchronizedBlocks::performSynchronisedTask));
service.awaitTermination(500, TimeUnit.MILLISECONDS);
assertEquals(1000, synchronizedBlocks.getCount());
}
@Test
public void givenMultiThread_whenStaticSyncBlock() throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
IntStream.range(0, 1000)
.forEach(count -> service.submit(BaeldungSynchronizedBlocks::performStaticSyncTask));
service.awaitTermination(500, TimeUnit.MILLISECONDS);
assertEquals(1000, BaeldungSynchronizedBlocks.getStaticCount());
}
}

View File

@@ -0,0 +1,51 @@
package com.baeldung.concurrent.synchronize;
import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
public class BaeldungSynchronizeMethodsUnitTest {
@Test
@Ignore
public void givenMultiThread_whenNonSyncMethod() throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(3);
BaeldungSynchronizedMethods method = new BaeldungSynchronizedMethods();
IntStream.range(0, 1000)
.forEach(count -> service.submit(method::calculate));
service.awaitTermination(100, TimeUnit.MILLISECONDS);
assertEquals(1000, method.getSum());
}
@Test
public void givenMultiThread_whenMethodSync() throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(3);
BaeldungSynchronizedMethods method = new BaeldungSynchronizedMethods();
IntStream.range(0, 1000)
.forEach(count -> service.submit(method::synchronisedCalculate));
service.awaitTermination(100, TimeUnit.MILLISECONDS);
assertEquals(1000, method.getSyncSum());
}
@Test
public void givenMultiThread_whenStaticSyncMethod() throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
IntStream.range(0, 1000)
.forEach(count -> service.submit(BaeldungSynchronizedMethods::syncStaticCalculate));
service.awaitTermination(100, TimeUnit.MILLISECONDS);
assertEquals(1000, BaeldungSynchronizedMethods.staticSum);
}
}

View File

@@ -0,0 +1,66 @@
package com.baeldung.concurrent.waitandnotify;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class NetworkIntegrationTest {
private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
private String expected;
@Before
public void setUpStreams() {
System.setOut(new PrintStream(outContent));
System.setErr(new PrintStream(errContent));
}
@Before
public void setUpExpectedOutput() {
StringWriter expectedStringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(expectedStringWriter);
printWriter.println("First packet");
printWriter.println("Second packet");
printWriter.println("Third packet");
printWriter.println("Fourth packet");
printWriter.close();
expected = expectedStringWriter.toString();
}
@After
public void cleanUpStreams() {
System.setOut(null);
System.setErr(null);
}
@Test
public void givenSenderAndReceiver_whenSendingPackets_thenNetworkSynchronized() {
Data data = new Data();
Thread sender = new Thread(new Sender(data));
Thread receiver = new Thread(new Receiver(data));
sender.start();
receiver.start();
//wait for sender and receiver to finish before we test against expected
try {
sender.join();
receiver.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Thread Interrupted");
}
assertEquals(expected, outContent.toString());
}
}

View File

@@ -0,0 +1,162 @@
package com.baeldung.java8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Before;
import org.junit.Test;
public class Java8ExecutorServiceIntegrationTest {
private Runnable runnableTask;
private Callable<String> callableTask;
private List<Callable<String>> callableTasks;
@Before
public void init() {
runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};
callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);
}
@Test
public void creationSubmittingTaskShuttingDown_whenShutDown_thenCorrect() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.submit(runnableTask);
executorService.submit(callableTask);
executorService.shutdown();
assertTrue(executorService.isShutdown());
}
@Test
public void creationSubmittingTasksShuttingDownNow_whenShutDownAfterAwating_thenCorrect() {
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
for (int i = 0; i < 100; i++) {
threadPoolExecutor.submit(callableTask);
}
List<Runnable> notExecutedTasks = smartShutdown(threadPoolExecutor);
assertTrue(threadPoolExecutor.isShutdown());
assertFalse(notExecutedTasks.isEmpty());
assertTrue(notExecutedTasks.size() < 98);
}
private List<Runnable> smartShutdown(ExecutorService executorService) {
List<Runnable> notExecutedTasks = new ArrayList<>();
executorService.shutdown();
try {
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
notExecutedTasks = executorService.shutdownNow();
}
} catch (InterruptedException e) {
notExecutedTasks = executorService.shutdownNow();
}
return notExecutedTasks;
}
@Test
public void submittingTasks_whenExecutedOneAndAll_thenCorrect() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
String result = null;
List<Future<String>> futures = new ArrayList<>();
try {
result = executorService.invokeAny(callableTasks);
futures = executorService.invokeAll(callableTasks);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
assertEquals("Task's execution", result);
assertTrue(futures.size() == 3);
}
@Test
public void submittingTaskShuttingDown_whenGetExpectedResult_thenCorrect() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(callableTask);
String result = null;
try {
result = future.get();
result = future.get(200, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
executorService.shutdown();
assertEquals("Task's execution", result);
}
@Test
public void submittingTask_whenCanceled_thenCorrect() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(callableTask);
boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();
executorService.shutdown();
assertTrue(canceled);
assertTrue(isCancelled);
}
@Test
public void submittingTaskScheduling_whenExecuted_thenCorrect() {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
Future<String> resultFuture = executorService.schedule(callableTask, 1, TimeUnit.SECONDS);
String result = null;
try {
result = resultFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executorService.shutdown();
assertEquals("Task's execution", result);
}
}

View File

@@ -0,0 +1,13 @@
*.class
#folders#
/target
/neoDb*
/data
/src/main/webapp/WEB-INF/classes
*/META-INF/*
# Packaged files #
*.jar
*.war
*.ear