[BAEL-9555] - Created a core-java-modules folder
This commit is contained in:
26
core-java-modules/core-java-concurrency-advanced/.gitignore
vendored
Normal file
26
core-java-modules/core-java-concurrency-advanced/.gitignore
vendored
Normal file
@@ -0,0 +1,26 @@
|
||||
*.class
|
||||
|
||||
0.*
|
||||
|
||||
#folders#
|
||||
/target
|
||||
/neoDb*
|
||||
/data
|
||||
/src/main/webapp/WEB-INF/classes
|
||||
*/META-INF/*
|
||||
.resourceCache
|
||||
|
||||
# Packaged files #
|
||||
*.jar
|
||||
*.war
|
||||
*.ear
|
||||
|
||||
# Files generated by integration tests
|
||||
*.txt
|
||||
backup-pom.xml
|
||||
/bin/
|
||||
/temp
|
||||
|
||||
#IntelliJ specific
|
||||
.idea/
|
||||
*.iml
|
||||
25
core-java-modules/core-java-concurrency-advanced/README.md
Normal file
25
core-java-modules/core-java-concurrency-advanced/README.md
Normal file
@@ -0,0 +1,25 @@
|
||||
=========
|
||||
|
||||
## Core Java Concurrency Advanced Examples
|
||||
|
||||
### Relevant Articles:
|
||||
- [Introduction to Thread Pools in Java](http://www.baeldung.com/thread-pool-java-and-guava)
|
||||
- [Guide to CountDownLatch in Java](http://www.baeldung.com/java-countdown-latch)
|
||||
- [Guide to java.util.concurrent.Locks](http://www.baeldung.com/java-concurrent-locks)
|
||||
- [An Introduction to ThreadLocal in Java](http://www.baeldung.com/java-threadlocal)
|
||||
- [LongAdder and LongAccumulator in Java](http://www.baeldung.com/java-longadder-and-longaccumulator)
|
||||
- [The Dining Philosophers Problem in Java](http://www.baeldung.com/java-dining-philoshophers)
|
||||
- [Guide to the Java Phaser](http://www.baeldung.com/java-phaser)
|
||||
- [An Introduction to Atomic Variables in Java](http://www.baeldung.com/java-atomic-variables)
|
||||
- [CyclicBarrier in Java](http://www.baeldung.com/java-cyclic-barrier)
|
||||
- [Guide to the Volatile Keyword in Java](http://www.baeldung.com/java-volatile)
|
||||
- [Semaphores in Java](http://www.baeldung.com/java-semaphore)
|
||||
- [Daemon Threads in Java](http://www.baeldung.com/java-daemon-thread)
|
||||
- [Priority-based Job Scheduling in Java](http://www.baeldung.com/java-priority-job-schedule)
|
||||
- [Brief Introduction to Java Thread.yield()](https://www.baeldung.com/java-thread-yield)
|
||||
- [Print Even and Odd Numbers Using 2 Threads](https://www.baeldung.com/java-even-odd-numbers-with-2-threads)
|
||||
- [Java CyclicBarrier vs CountDownLatch](https://www.baeldung.com/java-cyclicbarrier-countdownlatch)
|
||||
- [Guide to the Fork/Join Framework in Java](http://www.baeldung.com/java-fork-join)
|
||||
- [Guide to ThreadLocalRandom in Java](http://www.baeldung.com/java-thread-local-random)
|
||||
- [The Thread.join() Method in Java](http://www.baeldung.com/java-thread-join)
|
||||
- [Passing Parameters to Java Threads](https://www.baeldung.com/java-thread-parameters)
|
||||
86
core-java-modules/core-java-concurrency-advanced/pom.xml
Normal file
86
core-java-modules/core-java-concurrency-advanced/pom.xml
Normal file
@@ -0,0 +1,86 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>core-java-concurrency-advanced</artifactId>
|
||||
<version>0.1.0-SNAPSHOT</version>
|
||||
<name>core-java-concurrency-advanced</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-java</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../../parent-java</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-collections4</artifactId>
|
||||
<version>${commons-collections4.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
<version>${commons-io.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<version>${commons-lang3.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-math3</artifactId>
|
||||
<version>${commons-math3.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>${assertj.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.jayway.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<version>${avaitility.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.openjdk.jmh</groupId>
|
||||
<artifactId>jmh-core</artifactId>
|
||||
<version>${jmh-core.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.openjdk.jmh</groupId>
|
||||
<artifactId>jmh-generator-annprocess</artifactId>
|
||||
<version>${jmh-generator-annprocess.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<finalName>core-java-concurrency-advanced</finalName>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>src/main/resources</directory>
|
||||
<filtering>true</filtering>
|
||||
</resource>
|
||||
</resources>
|
||||
</build>
|
||||
|
||||
<properties>
|
||||
<!-- util -->
|
||||
<guava.version>21.0</guava.version>
|
||||
<commons-lang3.version>3.5</commons-lang3.version>
|
||||
<commons-math3.version>3.6.1</commons-math3.version>
|
||||
<commons-collections4.version>4.1</commons-collections4.version>
|
||||
<collections-generic.version>4.01</collections-generic.version>
|
||||
<!-- testing -->
|
||||
<assertj.version>3.6.1</assertj.version>
|
||||
<avaitility.version>1.7.0</avaitility.version>
|
||||
<jmh-core.version>1.19</jmh-core.version>
|
||||
<jmh-generator-annprocess.version>1.19</jmh-generator-annprocess.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.baeldung.concurrent.atomic;
|
||||
|
||||
public class SafeCounterWithLock {
|
||||
private volatile int counter;
|
||||
|
||||
int getValue() {
|
||||
return counter;
|
||||
}
|
||||
|
||||
synchronized void increment() {
|
||||
counter++;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.baeldung.concurrent.atomic;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class SafeCounterWithoutLock {
|
||||
private final AtomicInteger counter = new AtomicInteger(0);
|
||||
|
||||
int getValue() {
|
||||
return counter.get();
|
||||
}
|
||||
|
||||
void increment() {
|
||||
while(true) {
|
||||
int existingValue = getValue();
|
||||
int newValue = existingValue + 1;
|
||||
if(counter.compareAndSet(existingValue, newValue)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.baeldung.concurrent.atomic;
|
||||
|
||||
public class UnsafeCounter {
|
||||
private int counter;
|
||||
|
||||
int getValue() {
|
||||
return counter;
|
||||
}
|
||||
|
||||
void increment() {
|
||||
counter++;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package com.baeldung.concurrent.countdownlatch;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class BrokenWorker implements Runnable {
|
||||
private final List<String> outputScraper;
|
||||
private final CountDownLatch countDownLatch;
|
||||
|
||||
BrokenWorker(final List<String> outputScraper, final CountDownLatch countDownLatch) {
|
||||
this.outputScraper = outputScraper;
|
||||
this.countDownLatch = countDownLatch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (true) {
|
||||
throw new RuntimeException("Oh dear");
|
||||
}
|
||||
countDownLatch.countDown();
|
||||
outputScraper.add("Counted down");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
package com.baeldung.concurrent.countdownlatch;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class CountdownLatchCountExample {
|
||||
|
||||
private int count;
|
||||
|
||||
public CountdownLatchCountExample(int count) {
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public boolean callTwiceInSameThread() {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(count);
|
||||
Thread t = new Thread(() -> {
|
||||
countDownLatch.countDown();
|
||||
countDownLatch.countDown();
|
||||
});
|
||||
t.start();
|
||||
|
||||
try {
|
||||
countDownLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return countDownLatch.getCount() == 0;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
CountdownLatchCountExample ex = new CountdownLatchCountExample(2);
|
||||
System.out.println("Is CountDown Completed : " + ex.callTwiceInSameThread());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package com.baeldung.concurrent.countdownlatch;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class CountdownLatchResetExample {
|
||||
|
||||
private int count;
|
||||
private int threadCount;
|
||||
private final AtomicInteger updateCount;
|
||||
|
||||
CountdownLatchResetExample(int count, int threadCount) {
|
||||
updateCount = new AtomicInteger(0);
|
||||
this.count = count;
|
||||
this.threadCount = threadCount;
|
||||
}
|
||||
|
||||
public int countWaits() {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(count);
|
||||
ExecutorService es = Executors.newFixedThreadPool(threadCount);
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
es.execute(() -> {
|
||||
long prevValue = countDownLatch.getCount();
|
||||
countDownLatch.countDown();
|
||||
if (countDownLatch.getCount() != prevValue) {
|
||||
updateCount.incrementAndGet();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
es.shutdown();
|
||||
return updateCount.get();
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
CountdownLatchResetExample ex = new CountdownLatchResetExample(5, 20);
|
||||
System.out.println("Count : " + ex.countWaits());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.baeldung.concurrent.countdownlatch;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class WaitingWorker implements Runnable {
|
||||
|
||||
private final List<String> outputScraper;
|
||||
private final CountDownLatch readyThreadCounter;
|
||||
private final CountDownLatch callingThreadBlocker;
|
||||
private final CountDownLatch completedThreadCounter;
|
||||
|
||||
WaitingWorker(final List<String> outputScraper, final CountDownLatch readyThreadCounter, final CountDownLatch callingThreadBlocker, CountDownLatch completedThreadCounter) {
|
||||
|
||||
this.outputScraper = outputScraper;
|
||||
this.readyThreadCounter = readyThreadCounter;
|
||||
this.callingThreadBlocker = callingThreadBlocker;
|
||||
this.completedThreadCounter = completedThreadCounter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
// Mark this thread as read / started
|
||||
readyThreadCounter.countDown();
|
||||
try {
|
||||
callingThreadBlocker.await();
|
||||
outputScraper.add("Counted down");
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
completedThreadCounter.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.baeldung.concurrent.countdownlatch;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class Worker implements Runnable {
|
||||
private final List<String> outputScraper;
|
||||
private final CountDownLatch countDownLatch;
|
||||
|
||||
Worker(final List<String> outputScraper, final CountDownLatch countDownLatch) {
|
||||
this.outputScraper = outputScraper;
|
||||
this.countDownLatch = countDownLatch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
// Do some work
|
||||
System.out.println("Doing some logic");
|
||||
outputScraper.add("Counted down");
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package com.baeldung.concurrent.cyclicbarrier;
|
||||
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class CyclicBarrierCompletionMethodExample {
|
||||
|
||||
private int count;
|
||||
private int threadCount;
|
||||
private final AtomicInteger updateCount;
|
||||
|
||||
CyclicBarrierCompletionMethodExample(int count, int threadCount) {
|
||||
updateCount = new AtomicInteger(0);
|
||||
this.count = count;
|
||||
this.threadCount = threadCount;
|
||||
}
|
||||
|
||||
public int countTrips() {
|
||||
|
||||
CyclicBarrier cyclicBarrier = new CyclicBarrier(count, () -> {
|
||||
updateCount.incrementAndGet();
|
||||
});
|
||||
|
||||
ExecutorService es = Executors.newFixedThreadPool(threadCount);
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
es.execute(() -> {
|
||||
try {
|
||||
cyclicBarrier.await();
|
||||
} catch (InterruptedException | BrokenBarrierException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
}
|
||||
es.shutdown();
|
||||
return updateCount.get();
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
CyclicBarrierCompletionMethodExample ex = new CyclicBarrierCompletionMethodExample(5, 20);
|
||||
System.out.println("Count : " + ex.countTrips());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.baeldung.concurrent.cyclicbarrier;
|
||||
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
||||
public class CyclicBarrierCountExample {
|
||||
|
||||
private int count;
|
||||
|
||||
public CyclicBarrierCountExample(int count) {
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public boolean callTwiceInSameThread() {
|
||||
CyclicBarrier cyclicBarrier = new CyclicBarrier(count);
|
||||
Thread t = new Thread(() -> {
|
||||
try {
|
||||
cyclicBarrier.await();
|
||||
cyclicBarrier.await();
|
||||
} catch (InterruptedException | BrokenBarrierException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
return cyclicBarrier.isBroken();
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
CyclicBarrierCountExample ex = new CyclicBarrierCountExample(7);
|
||||
System.out.println("Count : " + ex.callTwiceInSameThread());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,77 @@
|
||||
package com.baeldung.concurrent.cyclicbarrier;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
||||
public class CyclicBarrierDemo {
|
||||
|
||||
private CyclicBarrier cyclicBarrier;
|
||||
private List<List<Integer>> partialResults = Collections.synchronizedList(new ArrayList<>());
|
||||
private Random random = new Random();
|
||||
private int NUM_PARTIAL_RESULTS;
|
||||
private int NUM_WORKERS;
|
||||
|
||||
private void runSimulation(int numWorkers, int numberOfPartialResults) {
|
||||
NUM_PARTIAL_RESULTS = numberOfPartialResults;
|
||||
NUM_WORKERS = numWorkers;
|
||||
|
||||
cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread());
|
||||
System.out.println("Spawning " + NUM_WORKERS + " worker threads to compute " + NUM_PARTIAL_RESULTS + " partial results each");
|
||||
for (int i = 0; i < NUM_WORKERS; i++) {
|
||||
Thread worker = new Thread(new NumberCruncherThread());
|
||||
worker.setName("Thread " + i);
|
||||
worker.start();
|
||||
}
|
||||
}
|
||||
|
||||
class NumberCruncherThread implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
String thisThreadName = Thread.currentThread().getName();
|
||||
List<Integer> partialResult = new ArrayList<>();
|
||||
for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) {
|
||||
Integer num = random.nextInt(10);
|
||||
System.out.println(thisThreadName + ": Crunching some numbers! Final result - " + num);
|
||||
partialResult.add(num);
|
||||
}
|
||||
partialResults.add(partialResult);
|
||||
try {
|
||||
System.out.println(thisThreadName + " waiting for others to reach barrier.");
|
||||
cyclicBarrier.await();
|
||||
} catch (InterruptedException | BrokenBarrierException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class AggregatorThread implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
String thisThreadName = Thread.currentThread().getName();
|
||||
System.out.println(thisThreadName + ": Computing final sum of " + NUM_WORKERS + " workers, having " + NUM_PARTIAL_RESULTS + " results each.");
|
||||
int sum = 0;
|
||||
for (List<Integer> threadResult : partialResults) {
|
||||
System.out.print("Adding ");
|
||||
for (Integer partialResult : threadResult) {
|
||||
System.out.print(partialResult + " ");
|
||||
sum += partialResult;
|
||||
}
|
||||
System.out.println();
|
||||
}
|
||||
System.out.println(Thread.currentThread().getName() + ": Final result = " + sum);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
CyclicBarrierDemo play = new CyclicBarrierDemo();
|
||||
play.runSimulation(5, 3);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
package com.baeldung.concurrent.cyclicbarrier;
|
||||
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class CyclicBarrierResetExample {
|
||||
|
||||
private int count;
|
||||
private int threadCount;
|
||||
private final AtomicInteger updateCount;
|
||||
|
||||
CyclicBarrierResetExample(int count, int threadCount) {
|
||||
updateCount = new AtomicInteger(0);
|
||||
this.count = count;
|
||||
this.threadCount = threadCount;
|
||||
}
|
||||
|
||||
public int countWaits() {
|
||||
|
||||
CyclicBarrier cyclicBarrier = new CyclicBarrier(count);
|
||||
|
||||
ExecutorService es = Executors.newFixedThreadPool(threadCount);
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
es.execute(() -> {
|
||||
try {
|
||||
if (cyclicBarrier.getNumberWaiting() > 0) {
|
||||
updateCount.incrementAndGet();
|
||||
}
|
||||
cyclicBarrier.await();
|
||||
} catch (InterruptedException | BrokenBarrierException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
}
|
||||
es.shutdown();
|
||||
return updateCount.get();
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
CyclicBarrierResetExample ex = new CyclicBarrierResetExample(7, 20);
|
||||
System.out.println("Count : " + ex.countWaits());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package com.baeldung.concurrent.daemon;
|
||||
|
||||
public class MultipleThreadsExample {
|
||||
public static void main(String[] args) {
|
||||
NewThread t1 = new NewThread();
|
||||
t1.setName("MyThread-1");
|
||||
NewThread t2 = new NewThread();
|
||||
t2.setName("MyThread-2");
|
||||
t1.start();
|
||||
t2.start();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package com.baeldung.concurrent.daemon;
|
||||
|
||||
public class NewThread extends Thread {
|
||||
public void run() {
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (true) {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
System.out.println(this.getName() + ": New Thread is running..." + i);
|
||||
try {
|
||||
//Wait for one sec so it doesn't print too fast
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
// prevent the Thread to run forever. It will finish it's execution after 2 seconds
|
||||
if (System.currentTimeMillis() - startTime > 2000) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
package com.baeldung.concurrent.daemon;
|
||||
|
||||
public class SingleThreadExample {
|
||||
public static void main(String[] args) {
|
||||
NewThread t = new NewThread();
|
||||
t.start();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package com.baeldung.concurrent.diningphilosophers;
|
||||
|
||||
public class DiningPhilosophers {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
Philosopher[] philosophers = new Philosopher[5];
|
||||
Object[] forks = new Object[philosophers.length];
|
||||
|
||||
for (int i = 0; i < forks.length; i++) {
|
||||
forks[i] = new Object();
|
||||
}
|
||||
|
||||
for (int i = 0; i < philosophers.length; i++) {
|
||||
|
||||
Object leftFork = forks[i];
|
||||
Object rightFork = forks[(i + 1) % forks.length];
|
||||
|
||||
if (i == philosophers.length - 1) {
|
||||
philosophers[i] = new Philosopher(rightFork, leftFork); // The last philosopher picks up the right fork first
|
||||
} else {
|
||||
philosophers[i] = new Philosopher(leftFork, rightFork);
|
||||
}
|
||||
|
||||
Thread t = new Thread(philosophers[i], "Philosopher " + (i + 1));
|
||||
t.start();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
package com.baeldung.concurrent.diningphilosophers;
|
||||
|
||||
public class Philosopher implements Runnable {
|
||||
|
||||
private final Object leftFork;
|
||||
private final Object rightFork;
|
||||
|
||||
Philosopher(Object left, Object right) {
|
||||
this.leftFork = left;
|
||||
this.rightFork = right;
|
||||
}
|
||||
|
||||
private void doAction(String action) throws InterruptedException {
|
||||
System.out.println(Thread.currentThread().getName() + " " + action);
|
||||
Thread.sleep(((int) (Math.random() * 100)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (true) {
|
||||
doAction(System.nanoTime() + ": Thinking"); // thinking
|
||||
synchronized (leftFork) {
|
||||
doAction(System.nanoTime() + ": Picked up left fork");
|
||||
synchronized (rightFork) {
|
||||
doAction(System.nanoTime() + ": Picked up right fork - eating"); // eating
|
||||
doAction(System.nanoTime() + ": Put down right fork");
|
||||
}
|
||||
doAction(System.nanoTime() + ": Put down left fork. Returning to thinking");
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
package com.baeldung.concurrent.evenandodd;
|
||||
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
public class PrintEvenOddSemaphore {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SharedPrinter sp = new SharedPrinter();
|
||||
Thread odd = new Thread(new Odd(sp, 10), "Odd");
|
||||
Thread even = new Thread(new Even(sp, 10), "Even");
|
||||
|
||||
odd.start();
|
||||
even.start();
|
||||
}
|
||||
}
|
||||
|
||||
class SharedPrinter {
|
||||
|
||||
private final Semaphore semEven = new Semaphore(0);
|
||||
private final Semaphore semOdd = new Semaphore(1);
|
||||
|
||||
void printEvenNum(int num) {
|
||||
try {
|
||||
semEven.acquire();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
System.out.println(Thread.currentThread().getName() + ":"+num);
|
||||
semOdd.release();
|
||||
}
|
||||
|
||||
void printOddNum(int num) {
|
||||
try {
|
||||
semOdd.acquire();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
System.out.println(Thread.currentThread().getName() + ":"+ num);
|
||||
semEven.release();
|
||||
}
|
||||
}
|
||||
|
||||
class Even implements Runnable {
|
||||
private final SharedPrinter sp;
|
||||
private final int max;
|
||||
|
||||
Even(SharedPrinter sp, int max) {
|
||||
this.sp = sp;
|
||||
this.max = max;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 2; i <= max; i = i + 2) {
|
||||
sp.printEvenNum(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class Odd implements Runnable {
|
||||
private SharedPrinter sp;
|
||||
private int max;
|
||||
|
||||
Odd(SharedPrinter sp, int max) {
|
||||
this.sp = sp;
|
||||
this.max = max;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 1; i <= max; i = i + 2) {
|
||||
sp.printOddNum(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
package com.baeldung.concurrent.evenandodd;
|
||||
|
||||
public class PrintEvenOddWaitNotify {
|
||||
|
||||
public static void main(String... args) {
|
||||
Printer print = new Printer();
|
||||
Thread t1 = new Thread(new TaskEvenOdd(print, 10, false), "Odd");
|
||||
Thread t2 = new Thread(new TaskEvenOdd(print, 10, true), "Even");
|
||||
t1.start();
|
||||
t2.start();
|
||||
}
|
||||
}
|
||||
|
||||
class TaskEvenOdd implements Runnable {
|
||||
private final int max;
|
||||
private final Printer print;
|
||||
private final boolean isEvenNumber;
|
||||
|
||||
TaskEvenOdd(Printer print, int max, boolean isEvenNumber) {
|
||||
this.print = print;
|
||||
this.max = max;
|
||||
this.isEvenNumber = isEvenNumber;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
int number = isEvenNumber ? 2 : 1;
|
||||
while (number <= max) {
|
||||
if (isEvenNumber) {
|
||||
print.printEven(number);
|
||||
} else {
|
||||
print.printOdd(number);
|
||||
}
|
||||
number += 2;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class Printer {
|
||||
private volatile boolean isOdd;
|
||||
|
||||
synchronized void printEven(int number) {
|
||||
while (!isOdd) {
|
||||
try {
|
||||
wait();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
System.out.println(Thread.currentThread().getName() + ":" + number);
|
||||
isOdd = false;
|
||||
notify();
|
||||
}
|
||||
|
||||
synchronized void printOdd(int number) {
|
||||
while (isOdd) {
|
||||
try {
|
||||
wait();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
System.out.println(Thread.currentThread().getName() + ":" + number);
|
||||
isOdd = true;
|
||||
notify();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
package com.baeldung.concurrent.locks;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Stack;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
|
||||
public class ReentrantLockWithCondition {
|
||||
|
||||
private static Logger LOG = LoggerFactory.getLogger(ReentrantLockWithCondition.class);
|
||||
|
||||
private Stack<String> stack = new Stack<>();
|
||||
private static final int CAPACITY = 5;
|
||||
|
||||
private ReentrantLock lock = new ReentrantLock();
|
||||
private Condition stackEmptyCondition = lock.newCondition();
|
||||
private Condition stackFullCondition = lock.newCondition();
|
||||
|
||||
private void pushToStack(String item) throws InterruptedException {
|
||||
try {
|
||||
lock.lock();
|
||||
if (stack.size() == CAPACITY) {
|
||||
LOG.info(Thread.currentThread().getName() + " wait on stack full");
|
||||
stackFullCondition.await();
|
||||
}
|
||||
LOG.info("Pushing the item " + item);
|
||||
stack.push(item);
|
||||
stackEmptyCondition.signalAll();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private String popFromStack() throws InterruptedException {
|
||||
try {
|
||||
lock.lock();
|
||||
if (stack.size() == 0) {
|
||||
LOG.info(Thread.currentThread().getName() + " wait on stack empty");
|
||||
stackEmptyCondition.await();
|
||||
}
|
||||
return stack.pop();
|
||||
} finally {
|
||||
stackFullCondition.signalAll();
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
final int threadCount = 2;
|
||||
ReentrantLockWithCondition object = new ReentrantLockWithCondition();
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
service.execute(() -> {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
try {
|
||||
object.pushToStack("Item " + i);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
service.execute(() -> {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
try {
|
||||
LOG.info("Item popped " + object.popFromStack());
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
service.shutdown();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,88 @@
|
||||
package com.baeldung.concurrent.locks;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
|
||||
public class SharedObjectWithLock {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SharedObjectWithLock.class);
|
||||
|
||||
private ReentrantLock lock = new ReentrantLock(true);
|
||||
|
||||
private int counter = 0;
|
||||
|
||||
void perform() {
|
||||
|
||||
lock.lock();
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " acquired the lock");
|
||||
try {
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " processing");
|
||||
counter++;
|
||||
} catch (Exception exception) {
|
||||
LOG.error(" Interrupted Exception ", exception);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " released the lock");
|
||||
}
|
||||
}
|
||||
|
||||
private void performTryLock() {
|
||||
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " attempting to acquire the lock");
|
||||
try {
|
||||
boolean isLockAcquired = lock.tryLock(2, TimeUnit.SECONDS);
|
||||
if (isLockAcquired) {
|
||||
try {
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " acquired the lock");
|
||||
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " processing");
|
||||
sleep(1000);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " released the lock");
|
||||
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException exception) {
|
||||
LOG.error(" Interrupted Exception ", exception);
|
||||
}
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " could not acquire the lock");
|
||||
}
|
||||
|
||||
public ReentrantLock getLock() {
|
||||
return lock;
|
||||
}
|
||||
|
||||
boolean isLocked() {
|
||||
return lock.isLocked();
|
||||
}
|
||||
|
||||
boolean hasQueuedThreads() {
|
||||
return lock.hasQueuedThreads();
|
||||
}
|
||||
|
||||
int getCounter() {
|
||||
return counter;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
final int threadCount = 2;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
final SharedObjectWithLock object = new SharedObjectWithLock();
|
||||
|
||||
service.execute(object::perform);
|
||||
service.execute(object::performTryLock);
|
||||
|
||||
service.shutdown();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,104 @@
|
||||
package com.baeldung.concurrent.locks;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.locks.StampedLock;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
|
||||
public class StampedLockDemo {
|
||||
private Map<String, String> map = new HashMap<>();
|
||||
private Logger logger = LoggerFactory.getLogger(StampedLockDemo.class);
|
||||
|
||||
private final StampedLock lock = new StampedLock();
|
||||
|
||||
public void put(String key, String value) throws InterruptedException {
|
||||
long stamp = lock.writeLock();
|
||||
|
||||
try {
|
||||
logger.info(Thread.currentThread().getName() + " acquired the write lock with stamp " + stamp);
|
||||
map.put(key, value);
|
||||
} finally {
|
||||
lock.unlockWrite(stamp);
|
||||
logger.info(Thread.currentThread().getName() + " unlocked the write lock with stamp " + stamp);
|
||||
}
|
||||
}
|
||||
|
||||
public String get(String key) throws InterruptedException {
|
||||
long stamp = lock.readLock();
|
||||
logger.info(Thread.currentThread().getName() + " acquired the read lock with stamp " + stamp);
|
||||
try {
|
||||
sleep(5000);
|
||||
return map.get(key);
|
||||
|
||||
} finally {
|
||||
lock.unlockRead(stamp);
|
||||
logger.info(Thread.currentThread().getName() + " unlocked the read lock with stamp " + stamp);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private String readWithOptimisticLock(String key) throws InterruptedException {
|
||||
long stamp = lock.tryOptimisticRead();
|
||||
String value = map.get(key);
|
||||
|
||||
if (!lock.validate(stamp)) {
|
||||
stamp = lock.readLock();
|
||||
try {
|
||||
sleep(5000);
|
||||
return map.get(key);
|
||||
|
||||
} finally {
|
||||
lock.unlock(stamp);
|
||||
logger.info(Thread.currentThread().getName() + " unlocked the read lock with stamp " + stamp);
|
||||
|
||||
}
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
final int threadCount = 4;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
StampedLockDemo object = new StampedLockDemo();
|
||||
|
||||
Runnable writeTask = () -> {
|
||||
|
||||
try {
|
||||
object.put("key1", "value1");
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
};
|
||||
Runnable readTask = () -> {
|
||||
|
||||
try {
|
||||
object.get("key1");
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
};
|
||||
Runnable readOptimisticTask = () -> {
|
||||
|
||||
try {
|
||||
object.readWithOptimisticLock("key1");
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
};
|
||||
service.submit(writeTask);
|
||||
service.submit(writeTask);
|
||||
service.submit(readTask);
|
||||
service.submit(readOptimisticTask);
|
||||
|
||||
service.shutdown();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,120 @@
|
||||
package com.baeldung.concurrent.locks;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
|
||||
public class SynchronizedHashMapWithRWLock {
|
||||
|
||||
private static Map<String, String> syncHashMap = new HashMap<>();
|
||||
private Logger logger = LoggerFactory.getLogger(SynchronizedHashMapWithRWLock.class);
|
||||
|
||||
private final ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
private final Lock readLock = lock.readLock();
|
||||
private final Lock writeLock = lock.writeLock();
|
||||
|
||||
public void put(String key, String value) throws InterruptedException {
|
||||
|
||||
try {
|
||||
writeLock.lock();
|
||||
logger.info(Thread.currentThread().getName() + " writing");
|
||||
syncHashMap.put(key, value);
|
||||
sleep(1000);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public String get(String key) {
|
||||
try {
|
||||
readLock.lock();
|
||||
logger.info(Thread.currentThread().getName() + " reading");
|
||||
return syncHashMap.get(key);
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public String remove(String key) {
|
||||
try {
|
||||
writeLock.lock();
|
||||
return syncHashMap.remove(key);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean containsKey(String key) {
|
||||
try {
|
||||
readLock.lock();
|
||||
return syncHashMap.containsKey(key);
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
boolean isReadLockAvailable() {
|
||||
return readLock.tryLock();
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
|
||||
final int threadCount = 3;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
SynchronizedHashMapWithRWLock object = new SynchronizedHashMapWithRWLock();
|
||||
|
||||
service.execute(new Thread(new Writer(object), "Writer"));
|
||||
service.execute(new Thread(new Reader(object), "Reader1"));
|
||||
service.execute(new Thread(new Reader(object), "Reader2"));
|
||||
|
||||
service.shutdown();
|
||||
}
|
||||
|
||||
private static class Reader implements Runnable {
|
||||
|
||||
SynchronizedHashMapWithRWLock object;
|
||||
|
||||
Reader(SynchronizedHashMapWithRWLock object) {
|
||||
this.object = object;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
object.get("key" + i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class Writer implements Runnable {
|
||||
|
||||
SynchronizedHashMapWithRWLock object;
|
||||
|
||||
public Writer(SynchronizedHashMapWithRWLock object) {
|
||||
this.object = object;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
try {
|
||||
object.put("key" + i, "value" + i);
|
||||
sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package com.baeldung.concurrent.parameter;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
public class AverageCalculator implements Callable<Double> {
|
||||
|
||||
int[] numbers;
|
||||
|
||||
public AverageCalculator(int... parameter) {
|
||||
this.numbers = parameter == null ? new int[0] : parameter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Double call() throws Exception {
|
||||
return IntStream.of(this.numbers)
|
||||
.average()
|
||||
.orElse(0d);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.baeldung.concurrent.parameter;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
public class ParameterizedThreadExample {
|
||||
|
||||
public static void parameterisedThreadAnonymousClass() {
|
||||
final String parameter = "123";
|
||||
Thread parameterizedThread = new Thread(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
System.out.println(Thread.currentThread()
|
||||
.getName() + " : " + parameter);
|
||||
}
|
||||
});
|
||||
|
||||
parameterizedThread.start();
|
||||
}
|
||||
|
||||
public static Callable<Integer> sumCalculator(int... numbers) {
|
||||
return () -> numbers != null ? IntStream.of(numbers)
|
||||
.sum() : 0;
|
||||
}
|
||||
|
||||
public static Callable<Double> averageCalculator(int... numbers) {
|
||||
return () -> numbers != null ? IntStream.of(numbers)
|
||||
.average()
|
||||
.orElse(0d) : 0d;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package com.baeldung.concurrent.phaser;
|
||||
|
||||
import java.util.concurrent.Phaser;
|
||||
|
||||
class LongRunningAction implements Runnable {
|
||||
private String threadName;
|
||||
private Phaser ph;
|
||||
|
||||
LongRunningAction(String threadName, Phaser ph) {
|
||||
this.threadName = threadName;
|
||||
this.ph = ph;
|
||||
ph.register();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
System.out.println("This is phase " + ph.getPhase());
|
||||
System.out.println("Thread " + threadName + " before long running action");
|
||||
ph.arriveAndAwaitAdvance();
|
||||
try {
|
||||
Thread.sleep(20);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
ph.arriveAndDeregister();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
package com.baeldung.concurrent.prioritytaskexecution;
|
||||
|
||||
public class Job implements Runnable {
|
||||
private String jobName;
|
||||
private JobPriority jobPriority;
|
||||
|
||||
public Job(String jobName, JobPriority jobPriority) {
|
||||
this.jobName = jobName;
|
||||
this.jobPriority = jobPriority != null ? jobPriority : JobPriority.MEDIUM;
|
||||
}
|
||||
|
||||
public JobPriority getJobPriority() {
|
||||
return jobPriority;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
System.out.println("Job:" + jobName +
|
||||
" Priority:" + jobPriority);
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.baeldung.concurrent.prioritytaskexecution;
|
||||
|
||||
public enum JobPriority {
|
||||
HIGH,
|
||||
MEDIUM,
|
||||
LOW
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
package com.baeldung.concurrent.prioritytaskexecution;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class PriorityJobScheduler {
|
||||
|
||||
private ExecutorService priorityJobPoolExecutor;
|
||||
private ExecutorService priorityJobScheduler =
|
||||
Executors.newSingleThreadExecutor();
|
||||
private PriorityBlockingQueue<Job> priorityQueue;
|
||||
|
||||
public PriorityJobScheduler(Integer poolSize, Integer queueSize) {
|
||||
priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize);
|
||||
priorityQueue = new PriorityBlockingQueue<Job>(queueSize,
|
||||
Comparator.comparing(Job::getJobPriority));
|
||||
|
||||
priorityJobScheduler.execute(()->{
|
||||
while (true) {
|
||||
try {
|
||||
priorityJobPoolExecutor.execute(priorityQueue.take());
|
||||
} catch (InterruptedException e) {
|
||||
// exception needs special handling
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void scheduleJob(Job job) {
|
||||
priorityQueue.add(job);
|
||||
}
|
||||
|
||||
public int getQueuedTaskCount() {
|
||||
return priorityQueue.size();
|
||||
}
|
||||
|
||||
protected void close(ExecutorService scheduler) {
|
||||
scheduler.shutdown();
|
||||
try {
|
||||
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
scheduler.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
scheduler.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
public void closeScheduler() {
|
||||
close(priorityJobPoolExecutor);
|
||||
close(priorityJobScheduler);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package com.baeldung.concurrent.semaphores;
|
||||
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
class CounterUsingMutex {
|
||||
|
||||
private final Semaphore mutex;
|
||||
private int count;
|
||||
|
||||
CounterUsingMutex() {
|
||||
mutex = new Semaphore(1);
|
||||
count = 0;
|
||||
}
|
||||
|
||||
void increase() throws InterruptedException {
|
||||
mutex.acquire();
|
||||
this.count = this.count + 1;
|
||||
Thread.sleep(1000);
|
||||
mutex.release();
|
||||
|
||||
}
|
||||
|
||||
int getCount() {
|
||||
return this.count;
|
||||
}
|
||||
|
||||
boolean hasQueuedThreads() {
|
||||
return mutex.hasQueuedThreads();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package com.baeldung.concurrent.semaphores;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.lang3.concurrent.TimedSemaphore;
|
||||
|
||||
class DelayQueueUsingTimedSemaphore {
|
||||
|
||||
private final TimedSemaphore semaphore;
|
||||
|
||||
DelayQueueUsingTimedSemaphore(long period, int slotLimit) {
|
||||
semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit);
|
||||
}
|
||||
|
||||
boolean tryAdd() {
|
||||
return semaphore.tryAcquire();
|
||||
}
|
||||
|
||||
int availableSlots() {
|
||||
return semaphore.getAvailablePermits();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
package com.baeldung.concurrent.semaphores;
|
||||
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
class LoginQueueUsingSemaphore {
|
||||
|
||||
private final Semaphore semaphore;
|
||||
|
||||
LoginQueueUsingSemaphore(int slotLimit) {
|
||||
semaphore = new Semaphore(slotLimit);
|
||||
}
|
||||
|
||||
boolean tryLogin() {
|
||||
return semaphore.tryAcquire();
|
||||
}
|
||||
|
||||
void logout() {
|
||||
semaphore.release();
|
||||
}
|
||||
|
||||
int availableSlots() {
|
||||
return semaphore.availablePermits();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.baeldung.concurrent.volatilekeyword;
|
||||
|
||||
|
||||
public class SharedObject {
|
||||
private volatile int count=0;
|
||||
|
||||
void increamentCount(){
|
||||
count++;
|
||||
}
|
||||
public int getCount(){
|
||||
return count;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
package com.baeldung.concurrent.yield;
|
||||
|
||||
public class ThreadYield {
|
||||
public static void main(String[] args) {
|
||||
Runnable r = () -> {
|
||||
int counter = 0;
|
||||
while (counter < 2) {
|
||||
System.out.println(Thread.currentThread()
|
||||
.getName());
|
||||
counter++;
|
||||
Thread.yield();
|
||||
}
|
||||
};
|
||||
new Thread(r).start();
|
||||
new Thread(r).start();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
package com.baeldung.forkjoin;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ForkJoinTask;
|
||||
import java.util.concurrent.RecursiveAction;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
public class CustomRecursiveAction extends RecursiveAction {
|
||||
|
||||
private String workLoad = "";
|
||||
private static final int THRESHOLD = 4;
|
||||
|
||||
private static Logger logger = Logger.getAnonymousLogger();
|
||||
|
||||
public CustomRecursiveAction(String workLoad) {
|
||||
this.workLoad = workLoad;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void compute() {
|
||||
|
||||
if (workLoad.length() > THRESHOLD) {
|
||||
ForkJoinTask.invokeAll(createSubtasks());
|
||||
} else {
|
||||
processing(workLoad);
|
||||
}
|
||||
}
|
||||
|
||||
private Collection<CustomRecursiveAction> createSubtasks() {
|
||||
|
||||
List<CustomRecursiveAction> subtasks = new ArrayList<>();
|
||||
|
||||
String partOne = workLoad.substring(0, workLoad.length() / 2);
|
||||
String partTwo = workLoad.substring(workLoad.length() / 2, workLoad.length());
|
||||
|
||||
subtasks.add(new CustomRecursiveAction(partOne));
|
||||
subtasks.add(new CustomRecursiveAction(partTwo));
|
||||
|
||||
return subtasks;
|
||||
}
|
||||
|
||||
private void processing(String work) {
|
||||
String result = work.toUpperCase();
|
||||
logger.info("This result - (" + result + ") - was processed by " + Thread.currentThread()
|
||||
.getName());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
package com.baeldung.forkjoin;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ForkJoinTask;
|
||||
import java.util.concurrent.RecursiveTask;
|
||||
|
||||
public class CustomRecursiveTask extends RecursiveTask<Integer> {
|
||||
|
||||
private int[] arr;
|
||||
|
||||
private static final int THRESHOLD = 20;
|
||||
|
||||
public CustomRecursiveTask(int[] arr) {
|
||||
this.arr = arr;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Integer compute() {
|
||||
|
||||
if (arr.length > THRESHOLD) {
|
||||
|
||||
return ForkJoinTask.invokeAll(createSubtasks()).stream().mapToInt(ForkJoinTask::join).sum();
|
||||
|
||||
} else {
|
||||
return processing(arr);
|
||||
}
|
||||
}
|
||||
|
||||
private Collection<CustomRecursiveTask> createSubtasks() {
|
||||
List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
|
||||
dividedTasks.add(new CustomRecursiveTask(Arrays.copyOfRange(arr, 0, arr.length / 2)));
|
||||
dividedTasks.add(new CustomRecursiveTask(Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
|
||||
return dividedTasks;
|
||||
}
|
||||
|
||||
private Integer processing(int[] arr) {
|
||||
return Arrays.stream(arr).filter(a -> a > 10 && a < 27).map(a -> a * 10).sum();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
package com.baeldung.forkjoin.util;
|
||||
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
|
||||
public class PoolUtil {
|
||||
|
||||
public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
package com.baeldung.threadlocal;
|
||||
|
||||
|
||||
public class Context {
|
||||
private final String userName;
|
||||
|
||||
Context(String userName) {
|
||||
this.userName = userName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Context{" +
|
||||
"userNameSecret='" + userName + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.baeldung.threadlocal;
|
||||
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class SharedMapWithUserContext implements Runnable {
|
||||
final static Map<Integer, Context> userContextPerUserId = new ConcurrentHashMap<>();
|
||||
private final Integer userId;
|
||||
private UserRepository userRepository = new UserRepository();
|
||||
|
||||
SharedMapWithUserContext(Integer userId) {
|
||||
this.userId = userId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
String userName = userRepository.getUserNameForUserId(userId);
|
||||
userContextPerUserId.put(userId, new Context(userName));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.baeldung.threadlocal;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class ThreadLocalWithUserContext implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ThreadLocalWithUserContext.class);
|
||||
|
||||
private static final ThreadLocal<Context> userContext = new ThreadLocal<>();
|
||||
private final Integer userId;
|
||||
private UserRepository userRepository = new UserRepository();
|
||||
|
||||
ThreadLocalWithUserContext(Integer userId) {
|
||||
this.userId = userId;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
String userName = userRepository.getUserNameForUserId(userId);
|
||||
userContext.set(new Context(userName));
|
||||
LOG.debug("thread context for given userId: " + userId + " is: " + userContext.get());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package com.baeldung.threadlocal;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
|
||||
public class UserRepository {
|
||||
String getUserNameForUserId(Integer userId) {
|
||||
return UUID.randomUUID().toString();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.baeldung.threadlocalrandom;
|
||||
|
||||
import org.openjdk.jmh.runner.Runner;
|
||||
import org.openjdk.jmh.runner.options.Options;
|
||||
import org.openjdk.jmh.runner.options.OptionsBuilder;
|
||||
|
||||
public class ThreadLocalRandomBenchMarkRunner {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
Options options = new OptionsBuilder().include(ThreadLocalRandomBenchMarker.class.getSimpleName())
|
||||
.threads(1)
|
||||
.forks(1)
|
||||
.shouldFailOnError(true)
|
||||
.shouldDoGC(true)
|
||||
.jvmArgs("-server")
|
||||
.build();
|
||||
|
||||
new Runner(options).run();
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,64 @@
|
||||
package com.baeldung.threadlocalrandom;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Level;
|
||||
import org.openjdk.jmh.annotations.Mode;
|
||||
import org.openjdk.jmh.annotations.OutputTimeUnit;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.Setup;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
import org.openjdk.jmh.annotations.Warmup;
|
||||
|
||||
@BenchmarkMode(Mode.AverageTime)
|
||||
@Warmup(iterations = 1)
|
||||
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||
@State(Scope.Benchmark)
|
||||
public class ThreadLocalRandomBenchMarker {
|
||||
|
||||
List<Callable<Integer>> randomCallables = new ArrayList<>();
|
||||
List<Callable<Integer>> threadLocalRandomCallables = new ArrayList<>();
|
||||
|
||||
@Setup(Level.Iteration)
|
||||
public void init() {
|
||||
Random random = new Random();
|
||||
randomCallables = new ArrayList<>();
|
||||
threadLocalRandomCallables = new ArrayList<>();
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
randomCallables.add(() -> {
|
||||
return random.nextInt();
|
||||
});
|
||||
}
|
||||
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
threadLocalRandomCallables.add(() -> {
|
||||
return ThreadLocalRandom.current()
|
||||
.nextInt();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void randomValuesUsingRandom() throws InterruptedException {
|
||||
ExecutorService executor = Executors.newWorkStealingPool();
|
||||
executor.invokeAll(randomCallables);
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void randomValuesUsingThreadLocalRandom() throws InterruptedException {
|
||||
ExecutorService executor = Executors.newWorkStealingPool();
|
||||
executor.invokeAll(threadLocalRandomCallables);
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.baeldung.threadpool;
|
||||
|
||||
import java.util.concurrent.ForkJoinTask;
|
||||
import java.util.concurrent.RecursiveTask;
|
||||
|
||||
public class CountingTask extends RecursiveTask<Integer> {
|
||||
|
||||
private final TreeNode node;
|
||||
|
||||
CountingTask(TreeNode node) {
|
||||
this.node = node;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Integer compute() {
|
||||
return node.getValue() + node.getChildren().stream()
|
||||
.map(childNode -> new CountingTask(childNode).fork())
|
||||
.mapToInt(ForkJoinTask::join)
|
||||
.sum();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package com.baeldung.threadpool;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
|
||||
/**
|
||||
* This class demonstrates the usage of Guava's exiting executor services that keep the VM from hanging.
|
||||
* Without the exiting executor service, the task would hang indefinitely.
|
||||
* This behaviour cannot be demonstrated in JUnit tests, as JUnit kills the VM after the tests.
|
||||
*/
|
||||
public class ExitingExecutorServiceExample {
|
||||
|
||||
public static void main(String... args) {
|
||||
|
||||
final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
|
||||
final ExecutorService executorService = MoreExecutors.getExitingExecutorService(executor, 100, TimeUnit.MILLISECONDS);
|
||||
|
||||
executorService.submit((Runnable) () -> {
|
||||
while (true) {
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
package com.baeldung.threadpool;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
public class TreeNode {
|
||||
|
||||
private int value;
|
||||
|
||||
private Set<TreeNode> children;
|
||||
|
||||
TreeNode(int value, TreeNode... children) {
|
||||
this.value = value;
|
||||
this.children = Sets.newHashSet(children);
|
||||
}
|
||||
|
||||
public int getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public Set<TreeNode> getChildren() {
|
||||
return children;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
# Set root logger level to DEBUG and its only appender to A1.
|
||||
log4j.rootLogger=DEBUG, A1
|
||||
|
||||
# A1 is set to be a ConsoleAppender.
|
||||
log4j.appender.A1=org.apache.log4j.ConsoleAppender
|
||||
|
||||
# A1 uses PatternLayout.
|
||||
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
|
||||
@@ -0,0 +1,19 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
|
||||
</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<logger name="org.springframework" level="WARN" />
|
||||
<logger name="org.springframework.transaction" level="WARN" />
|
||||
|
||||
<!-- in order to debug some marshalling issues, this needs to be TRACE -->
|
||||
<logger name="org.springframework.web.servlet.mvc" level="WARN" />
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
</configuration>
|
||||
@@ -0,0 +1,38 @@
|
||||
package com.baeldung.concurrent.accumulator;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.LongAccumulator;
|
||||
import java.util.function.LongBinaryOperator;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
public class LongAccumulatorUnitTest {
|
||||
|
||||
@Test
|
||||
public void givenLongAccumulator_whenApplyActionOnItFromMultipleThrads_thenShouldProduceProperResult() throws InterruptedException {
|
||||
// given
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(8);
|
||||
LongBinaryOperator sum = Long::sum;
|
||||
LongAccumulator accumulator = new LongAccumulator(sum, 0L);
|
||||
int numberOfThreads = 4;
|
||||
int numberOfIncrements = 100;
|
||||
|
||||
// when
|
||||
Runnable accumulateAction = () -> IntStream.rangeClosed(0, numberOfIncrements).forEach(accumulator::accumulate);
|
||||
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
executorService.execute(accumulateAction);
|
||||
}
|
||||
|
||||
// then
|
||||
executorService.awaitTermination(500, TimeUnit.MILLISECONDS);
|
||||
executorService.shutdown();
|
||||
assertEquals(accumulator.get(), 20200);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
package com.baeldung.concurrent.adder;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static com.jayway.awaitility.Awaitility.await;
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
public class LongAdderUnitTest {
|
||||
@Test
|
||||
public void givenMultipleThread_whenTheyWriteToSharedLongAdder_thenShouldCalculateSumForThem() throws InterruptedException {
|
||||
//given
|
||||
LongAdder counter = new LongAdder();
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(8);
|
||||
|
||||
int numberOfThreads = 4;
|
||||
int numberOfIncrements = 100;
|
||||
|
||||
//when
|
||||
Runnable incrementAction = () -> IntStream
|
||||
.range(0, numberOfIncrements)
|
||||
.forEach((i) -> counter.increment());
|
||||
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
executorService.execute(incrementAction);
|
||||
}
|
||||
|
||||
//then
|
||||
executorService.awaitTermination(500, TimeUnit.MILLISECONDS);
|
||||
executorService.shutdown();
|
||||
|
||||
assertEquals(counter.sum(), numberOfIncrements * numberOfThreads);
|
||||
assertEquals(counter.sum(), numberOfIncrements * numberOfThreads);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMultipleThread_whenTheyWriteToSharedLongAdder_thenShouldCalculateSumForThemAndResetAdderAfterward() throws InterruptedException {
|
||||
//given
|
||||
LongAdder counter = new LongAdder();
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(8);
|
||||
|
||||
int numberOfThreads = 4;
|
||||
int numberOfIncrements = 100;
|
||||
|
||||
//when
|
||||
Runnable incrementAction = () -> IntStream
|
||||
.range(0, numberOfIncrements)
|
||||
.forEach((i) -> counter.increment());
|
||||
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
executorService.execute(incrementAction);
|
||||
}
|
||||
|
||||
//then
|
||||
executorService.awaitTermination(500, TimeUnit.MILLISECONDS);
|
||||
executorService.shutdown();
|
||||
|
||||
assertEquals(counter.sumThenReset(), numberOfIncrements * numberOfThreads);
|
||||
|
||||
await().until(() -> assertEquals(counter.sum(), 0));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package com.baeldung.concurrent.atomic;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class ThreadSafeCounterIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void givenMultiThread_whenSafeCounterWithLockIncrement() throws InterruptedException {
|
||||
ExecutorService service = Executors.newFixedThreadPool(3);
|
||||
SafeCounterWithLock safeCounter = new SafeCounterWithLock();
|
||||
|
||||
IntStream.range(0, 1000)
|
||||
.forEach(count -> service.submit(safeCounter::increment));
|
||||
service.awaitTermination(100, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertEquals(1000, safeCounter.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMultiThread_whenSafeCounterWithoutLockIncrement() throws InterruptedException {
|
||||
ExecutorService service = Executors.newFixedThreadPool(3);
|
||||
SafeCounterWithoutLock safeCounter = new SafeCounterWithoutLock();
|
||||
|
||||
IntStream.range(0, 1000)
|
||||
.forEach(count -> service.submit(safeCounter::increment));
|
||||
service.awaitTermination(100, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertEquals(1000, safeCounter.getValue());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
package com.baeldung.concurrent.atomic;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* This test shows the behaviour of a thread-unsafe class in a multithreaded scenario. We are calling
|
||||
* the increment methods 1000 times from a pool of 3 threads. In most of the cases, the counter will
|
||||
* less than 1000, because of lost updates, however, occasionally it may reach 1000, when no threads
|
||||
* called the method simultaneously. This may cause the build to fail occasionally. Hence excluding this
|
||||
* test from build by adding this in manual test
|
||||
*/
|
||||
public class ThreadUnsafeCounterManualTest {
|
||||
|
||||
@Test
|
||||
public void givenMultiThread_whenUnsafeCounterIncrement() throws InterruptedException {
|
||||
ExecutorService service = Executors.newFixedThreadPool(3);
|
||||
UnsafeCounter unsafeCounter = new UnsafeCounter();
|
||||
|
||||
IntStream.range(0, 1000)
|
||||
.forEach(count -> service.submit(unsafeCounter::increment));
|
||||
service.awaitTermination(100, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertEquals(1000, unsafeCounter.getValue());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.baeldung.concurrent.countdownlatch;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class CountdownLatchCountExampleUnitTest {
|
||||
|
||||
@Test
|
||||
public void whenCountDownLatch_completed() {
|
||||
CountdownLatchCountExample ex = new CountdownLatchCountExample(2);
|
||||
boolean isCompleted = ex.callTwiceInSameThread();
|
||||
assertTrue(isCompleted);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,72 @@
|
||||
package com.baeldung.concurrent.countdownlatch;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class CountdownLatchExampleIntegrationTest {
|
||||
@Test
|
||||
public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() throws InterruptedException {
|
||||
// Given
|
||||
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
|
||||
CountDownLatch countDownLatch = new CountDownLatch(5);
|
||||
List<Thread> workers = Stream.generate(() -> new Thread(new Worker(outputScraper, countDownLatch)))
|
||||
.limit(5)
|
||||
.collect(toList());
|
||||
|
||||
// When
|
||||
workers.forEach(Thread::start);
|
||||
countDownLatch.await(); // Block until workers finish
|
||||
outputScraper.add("Latch released");
|
||||
|
||||
// Then
|
||||
assertThat(outputScraper).containsExactly("Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Latch released");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenFailingToParallelProcess_thenMainThreadShouldTimeout() throws InterruptedException {
|
||||
// Given
|
||||
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
|
||||
CountDownLatch countDownLatch = new CountDownLatch(5);
|
||||
List<Thread> workers = Stream.generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch)))
|
||||
.limit(5)
|
||||
.collect(toList());
|
||||
|
||||
// When
|
||||
workers.forEach(Thread::start);
|
||||
final boolean result = countDownLatch.await(3L, TimeUnit.SECONDS);
|
||||
|
||||
// Then
|
||||
assertThat(result).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime() throws InterruptedException {
|
||||
// Given
|
||||
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
|
||||
CountDownLatch readyThreadCounter = new CountDownLatch(5);
|
||||
CountDownLatch callingThreadBlocker = new CountDownLatch(1);
|
||||
CountDownLatch completedThreadCounter = new CountDownLatch(5);
|
||||
List<Thread> workers = Stream.generate(() -> new Thread(new WaitingWorker(outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter))).limit(5).collect(toList());
|
||||
|
||||
// When
|
||||
workers.forEach(Thread::start);
|
||||
readyThreadCounter.await(); // Block until workers start
|
||||
outputScraper.add("Workers ready");
|
||||
callingThreadBlocker.countDown(); // Start workers
|
||||
completedThreadCounter.await(); // Block until workers finish
|
||||
outputScraper.add("Workers complete");
|
||||
|
||||
// Then
|
||||
assertThat(outputScraper).containsExactly("Workers ready", "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Workers complete");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.baeldung.concurrent.countdownlatch;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class CountdownLatchResetExampleUnitTest {
|
||||
|
||||
@Test
|
||||
public void whenCountDownLatch_noReset() {
|
||||
CountdownLatchResetExample ex = new CountdownLatchResetExample(7,20);
|
||||
int lineCount = ex.countWaits();
|
||||
assertTrue(lineCount <= 7);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.baeldung.concurrent.cyclicbarrier;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class CyclicBarrierCompletionMethodExampleUnitTest {
|
||||
|
||||
@Test
|
||||
public void whenCyclicBarrier_countTrips() {
|
||||
CyclicBarrierCompletionMethodExample ex = new CyclicBarrierCompletionMethodExample(7,20);
|
||||
int lineCount = ex.countTrips();
|
||||
assertEquals(2, lineCount);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.baeldung.concurrent.cyclicbarrier;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class CyclicBarrierCountExampleUnitTest {
|
||||
|
||||
@Test
|
||||
public void whenCyclicBarrier_notCompleted() {
|
||||
CyclicBarrierCountExample ex = new CyclicBarrierCountExample(2);
|
||||
boolean isCompleted = ex.callTwiceInSameThread();
|
||||
assertFalse(isCompleted);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.baeldung.concurrent.cyclicbarrier;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class CyclicBarrierResetExampleUnitTest {
|
||||
|
||||
@Test
|
||||
public void whenCyclicBarrier_reset() {
|
||||
CyclicBarrierResetExample ex = new CyclicBarrierResetExample(7,20);
|
||||
int lineCount = ex.countWaits();
|
||||
assertTrue(lineCount > 7);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package com.baeldung.concurrent.daemon;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
public class DaemonThreadUnitTest {
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void whenCallIsDaemon_thenCorrect() {
|
||||
NewThread daemonThread = new NewThread();
|
||||
NewThread userThread = new NewThread();
|
||||
daemonThread.setDaemon(true);
|
||||
daemonThread.start();
|
||||
userThread.start();
|
||||
|
||||
assertTrue(daemonThread.isDaemon());
|
||||
assertFalse(userThread.isDaemon());
|
||||
}
|
||||
|
||||
@Test(expected = IllegalThreadStateException.class)
|
||||
@Ignore
|
||||
public void givenUserThread_whenSetDaemonWhileRunning_thenIllegalThreadStateException() {
|
||||
NewThread daemonThread = new NewThread();
|
||||
daemonThread.start();
|
||||
daemonThread.setDaemon(true);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package com.baeldung.concurrent.locks;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
public class SharedObjectWithLockManualTest {
|
||||
|
||||
@Test
|
||||
public void whenLockAcquired_ThenLockedIsTrue() {
|
||||
final SharedObjectWithLock object = new SharedObjectWithLock();
|
||||
|
||||
final int threadCount = 2;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
|
||||
executeThreads(object, threadCount, service);
|
||||
|
||||
assertEquals(true, object.isLocked());
|
||||
|
||||
service.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenLocked_ThenQueuedThread() {
|
||||
final int threadCount = 4;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
final SharedObjectWithLock object = new SharedObjectWithLock();
|
||||
|
||||
executeThreads(object, threadCount, service);
|
||||
|
||||
assertEquals(object.hasQueuedThreads(), true);
|
||||
|
||||
service.shutdown();
|
||||
|
||||
}
|
||||
|
||||
public void whenTryLock_ThenQueuedThread() {
|
||||
final SharedObjectWithLock object = new SharedObjectWithLock();
|
||||
|
||||
final int threadCount = 2;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
|
||||
executeThreads(object, threadCount, service);
|
||||
|
||||
assertEquals(true, object.isLocked());
|
||||
|
||||
service.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenGetCount_ThenCorrectCount() throws InterruptedException {
|
||||
final int threadCount = 4;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
final SharedObjectWithLock object = new SharedObjectWithLock();
|
||||
|
||||
executeThreads(object, threadCount, service);
|
||||
Thread.sleep(1000);
|
||||
assertEquals(object.getCounter(), 4);
|
||||
|
||||
service.shutdown();
|
||||
|
||||
}
|
||||
|
||||
private void executeThreads(SharedObjectWithLock object, int threadCount, ExecutorService service) {
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
service.execute(object::perform);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
package com.baeldung.concurrent.locks;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
public class SynchronizedHashMapWithRWLockManualTest {
|
||||
|
||||
@Test
|
||||
public void whenWriting_ThenNoReading() {
|
||||
SynchronizedHashMapWithRWLock object = new SynchronizedHashMapWithRWLock();
|
||||
final int threadCount = 3;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
|
||||
executeWriterThreads(object, threadCount, service);
|
||||
|
||||
assertEquals(object.isReadLockAvailable(), false);
|
||||
|
||||
service.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenReading_ThenMultipleReadingAllowed() {
|
||||
SynchronizedHashMapWithRWLock object = new SynchronizedHashMapWithRWLock();
|
||||
final int threadCount = 5;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
|
||||
executeReaderThreads(object, threadCount, service);
|
||||
|
||||
assertEquals(object.isReadLockAvailable(), true);
|
||||
|
||||
service.shutdown();
|
||||
}
|
||||
|
||||
private void executeWriterThreads(SynchronizedHashMapWithRWLock object, int threadCount, ExecutorService service) {
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
service.execute(() -> {
|
||||
try {
|
||||
object.put("key" + threadCount, "value" + threadCount);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void executeReaderThreads(SynchronizedHashMapWithRWLock object, int threadCount, ExecutorService service) {
|
||||
for (int i = 0; i < threadCount; i++)
|
||||
service.execute(() -> object.get("key" + threadCount));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package com.baeldung.concurrent.phaser;
|
||||
|
||||
import org.junit.FixMethodOrder;
|
||||
import org.junit.Test;
|
||||
import org.junit.runners.MethodSorters;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Phaser;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||
public class PhaserUnitTest {
|
||||
|
||||
@Test
|
||||
public void givenPhaser_whenCoordinateWorksBetweenThreads_thenShouldCoordinateBetweenMultiplePhases() {
|
||||
//given
|
||||
ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
Phaser ph = new Phaser(1);
|
||||
assertEquals(0, ph.getPhase());
|
||||
|
||||
//when
|
||||
executorService.submit(new LongRunningAction("thread-1", ph));
|
||||
executorService.submit(new LongRunningAction("thread-2", ph));
|
||||
executorService.submit(new LongRunningAction("thread-3", ph));
|
||||
|
||||
//then
|
||||
ph.arriveAndAwaitAdvance();
|
||||
assertEquals(1, ph.getPhase());
|
||||
|
||||
//and
|
||||
executorService.submit(new LongRunningAction("thread-4", ph));
|
||||
executorService.submit(new LongRunningAction("thread-5", ph));
|
||||
ph.arriveAndAwaitAdvance();
|
||||
assertEquals(2, ph.getPhase());
|
||||
|
||||
|
||||
ph.arriveAndDeregister();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package com.baeldung.concurrent.prioritytaskexecution;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class PriorityJobSchedulerUnitTest {
|
||||
private static int POOL_SIZE = 1;
|
||||
private static int QUEUE_SIZE = 10;
|
||||
|
||||
@Test
|
||||
public void whenMultiplePriorityJobsQueued_thenHighestPriorityJobIsPicked() {
|
||||
Job job1 = new Job("Job1", JobPriority.LOW);
|
||||
Job job2 = new Job("Job2", JobPriority.MEDIUM);
|
||||
Job job3 = new Job("Job3", JobPriority.HIGH);
|
||||
Job job4 = new Job("Job4", JobPriority.MEDIUM);
|
||||
Job job5 = new Job("Job5", JobPriority.LOW);
|
||||
Job job6 = new Job("Job6", JobPriority.HIGH);
|
||||
|
||||
PriorityJobScheduler pjs = new PriorityJobScheduler(POOL_SIZE, QUEUE_SIZE);
|
||||
|
||||
pjs.scheduleJob(job1);
|
||||
pjs.scheduleJob(job2);
|
||||
pjs.scheduleJob(job3);
|
||||
pjs.scheduleJob(job4);
|
||||
pjs.scheduleJob(job5);
|
||||
pjs.scheduleJob(job6);
|
||||
|
||||
// ensure no tasks is pending before closing the scheduler
|
||||
while (pjs.getQueuedTaskCount() != 0);
|
||||
|
||||
// delay to avoid job sleep (added for demo) being interrupted
|
||||
try {
|
||||
Thread.sleep(2000);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
pjs.closeScheduler();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,119 @@
|
||||
package com.baeldung.concurrent.semaphores;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class SemaphoresManualTest {
|
||||
|
||||
// ========= login queue ======
|
||||
|
||||
@Test
|
||||
public void givenLoginQueue_whenReachLimit_thenBlocked() throws InterruptedException {
|
||||
final int slots = 10;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
|
||||
final LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
|
||||
IntStream.range(0, slots)
|
||||
.forEach(user -> executorService.execute(loginQueue::tryLogin));
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
||||
|
||||
assertEquals(0, loginQueue.availableSlots());
|
||||
assertFalse(loginQueue.tryLogin());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenLoginQueue_whenLogout_thenSlotsAvailable() throws InterruptedException {
|
||||
final int slots = 10;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
|
||||
final LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
|
||||
IntStream.range(0, slots)
|
||||
.forEach(user -> executorService.execute(loginQueue::tryLogin));
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
||||
|
||||
assertEquals(0, loginQueue.availableSlots());
|
||||
loginQueue.logout();
|
||||
assertTrue(loginQueue.availableSlots() > 0);
|
||||
assertTrue(loginQueue.tryLogin());
|
||||
}
|
||||
|
||||
// ========= delay queue =======
|
||||
|
||||
@Test
|
||||
public void givenDelayQueue_whenReachLimit_thenBlocked() throws InterruptedException {
|
||||
final int slots = 50;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
|
||||
final DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
|
||||
IntStream.range(0, slots)
|
||||
.forEach(user -> executorService.execute(delayQueue::tryAdd));
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
||||
|
||||
assertEquals(0, delayQueue.availableSlots());
|
||||
assertFalse(delayQueue.tryAdd());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException {
|
||||
final int slots = 50;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
|
||||
final DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
|
||||
IntStream.range(0, slots)
|
||||
.forEach(user -> executorService.execute(delayQueue::tryAdd));
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
||||
|
||||
assertEquals(0, delayQueue.availableSlots());
|
||||
Thread.sleep(1000);
|
||||
assertTrue(delayQueue.availableSlots() > 0);
|
||||
assertTrue(delayQueue.tryAdd());
|
||||
}
|
||||
|
||||
// ========== mutex ========
|
||||
|
||||
@Test
|
||||
public void whenMutexAndMultipleThreads_thenBlocked() throws InterruptedException {
|
||||
final int count = 5;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(count);
|
||||
final CounterUsingMutex counter = new CounterUsingMutex();
|
||||
IntStream.range(0, count)
|
||||
.forEach(user -> executorService.execute(() -> {
|
||||
try {
|
||||
counter.increase();
|
||||
} catch (final InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}));
|
||||
executorService.shutdown();
|
||||
|
||||
assertTrue(counter.hasQueuedThreads());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount() throws InterruptedException {
|
||||
final int count = 5;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(count);
|
||||
final CounterUsingMutex counter = new CounterUsingMutex();
|
||||
IntStream.range(0, count)
|
||||
.forEach(user -> executorService.execute(() -> {
|
||||
try {
|
||||
counter.increase();
|
||||
} catch (final InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}));
|
||||
executorService.shutdown();
|
||||
assertTrue(counter.hasQueuedThreads());
|
||||
Thread.sleep(5000);
|
||||
assertFalse(counter.hasQueuedThreads());
|
||||
assertEquals(count, counter.getCount());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
package com.baeldung.concurrent.volatilekeyword;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class SharedObjectManualTest {
|
||||
|
||||
@Test
|
||||
public void whenOneThreadWrites_thenVolatileReadsFromMainMemory() throws InterruptedException {
|
||||
SharedObject sharedObject = new SharedObject();
|
||||
|
||||
Thread writer = new Thread(() -> sharedObject.increamentCount());
|
||||
writer.start();
|
||||
Thread.sleep(100);
|
||||
|
||||
Thread readerOne = new Thread(() -> {
|
||||
int valueReadByThread2 = sharedObject.getCount();
|
||||
assertEquals(1, valueReadByThread2);
|
||||
});
|
||||
readerOne.start();
|
||||
|
||||
Thread readerTwo = new Thread(() -> {
|
||||
int valueReadByThread3 = sharedObject.getCount();
|
||||
assertEquals(1, valueReadByThread3);
|
||||
});
|
||||
readerTwo.start();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenTwoThreadWrites_thenVolatileReadsFromMainMemory() throws InterruptedException {
|
||||
SharedObject sharedObject = new SharedObject();
|
||||
Thread writerOne = new Thread(() -> sharedObject.increamentCount());
|
||||
writerOne.start();
|
||||
Thread.sleep(100);
|
||||
|
||||
Thread writerTwo = new Thread(() -> sharedObject.increamentCount());
|
||||
writerTwo.start();
|
||||
Thread.sleep(100);
|
||||
|
||||
Thread readerOne = new Thread(() -> {
|
||||
int valueReadByThread2 = sharedObject.getCount();
|
||||
assertEquals(2, valueReadByThread2);
|
||||
});
|
||||
readerOne.start();
|
||||
|
||||
Thread readerTwo = new Thread(() -> {
|
||||
int valueReadByThread3 = sharedObject.getCount();
|
||||
assertEquals(2, valueReadByThread3);
|
||||
});
|
||||
readerTwo.start();
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
package com.baeldung.java8;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.baeldung.forkjoin.CustomRecursiveAction;
|
||||
import com.baeldung.forkjoin.CustomRecursiveTask;
|
||||
import com.baeldung.forkjoin.util.PoolUtil;
|
||||
|
||||
public class Java8ForkJoinIntegrationTest {
|
||||
|
||||
private int[] arr;
|
||||
private CustomRecursiveTask customRecursiveTask;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
Random random = new Random();
|
||||
arr = new int[50];
|
||||
for (int i = 0; i < arr.length; i++) {
|
||||
arr[i] = random.nextInt(35);
|
||||
}
|
||||
customRecursiveTask = new CustomRecursiveTask(arr);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void callPoolUtil_whenExistsAndExpectedType_thenCorrect() {
|
||||
ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;
|
||||
ForkJoinPool forkJoinPoolTwo = PoolUtil.forkJoinPool;
|
||||
|
||||
assertNotNull(forkJoinPool);
|
||||
assertEquals(2, forkJoinPool.getParallelism());
|
||||
assertEquals(forkJoinPool, forkJoinPoolTwo);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void callCommonPool_whenExistsAndExpectedType_thenCorrect() {
|
||||
ForkJoinPool commonPool = ForkJoinPool.commonPool();
|
||||
ForkJoinPool commonPoolTwo = ForkJoinPool.commonPool();
|
||||
|
||||
assertNotNull(commonPool);
|
||||
assertEquals(commonPool, commonPoolTwo);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void executeRecursiveAction_whenExecuted_thenCorrect() {
|
||||
|
||||
CustomRecursiveAction myRecursiveAction = new CustomRecursiveAction("ddddffffgggghhhh");
|
||||
ForkJoinPool.commonPool().invoke(myRecursiveAction);
|
||||
|
||||
assertTrue(myRecursiveAction.isDone());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void executeRecursiveTask_whenExecuted_thenCorrect() {
|
||||
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
|
||||
|
||||
forkJoinPool.execute(customRecursiveTask);
|
||||
int result = customRecursiveTask.join();
|
||||
assertTrue(customRecursiveTask.isDone());
|
||||
|
||||
forkJoinPool.submit(customRecursiveTask);
|
||||
int resultTwo = customRecursiveTask.join();
|
||||
assertTrue(customRecursiveTask.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void executeRecursiveTaskWithFJ_whenExecuted_thenCorrect() {
|
||||
CustomRecursiveTask customRecursiveTaskFirst = new CustomRecursiveTask(arr);
|
||||
CustomRecursiveTask customRecursiveTaskSecond = new CustomRecursiveTask(arr);
|
||||
CustomRecursiveTask customRecursiveTaskLast = new CustomRecursiveTask(arr);
|
||||
|
||||
customRecursiveTaskFirst.fork();
|
||||
customRecursiveTaskSecond.fork();
|
||||
customRecursiveTaskLast.fork();
|
||||
int result = 0;
|
||||
result += customRecursiveTaskLast.join();
|
||||
result += customRecursiveTaskSecond.join();
|
||||
result += customRecursiveTaskFirst.join();
|
||||
|
||||
assertTrue(customRecursiveTaskFirst.isDone());
|
||||
assertTrue(customRecursiveTaskSecond.isDone());
|
||||
assertTrue(customRecursiveTaskLast.isDone());
|
||||
assertTrue(result != 0);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package com.baeldung.parameters;
|
||||
|
||||
import com.baeldung.concurrent.parameter.AverageCalculator;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class ParameterizedThreadUnitTest {
|
||||
|
||||
@Test
|
||||
public void whenSendingParameterToCallable_thenSuccessful() throws Exception {
|
||||
ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||
Future<Double> result = executorService.submit(new AverageCalculator(1, 2, 3));
|
||||
try {
|
||||
assertEquals(Double.valueOf(2.0), result.get());
|
||||
} finally {
|
||||
executorService.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenParametersToThreadWithLamda_thenParametersPassedCorrectly() throws Exception {
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(2);
|
||||
int[] numbers = new int[] { 4, 5, 6 };
|
||||
try {
|
||||
Future<Integer> sumResult = executorService.submit(() -> IntStream.of(numbers)
|
||||
.sum());
|
||||
Future<Double> averageResult = executorService.submit(() -> IntStream.of(numbers)
|
||||
.average()
|
||||
.orElse(0d));
|
||||
assertEquals(Integer.valueOf(15), sumResult.get());
|
||||
assertEquals(Double.valueOf(5.0), averageResult.get());
|
||||
} finally {
|
||||
executorService.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,95 @@
|
||||
package com.baeldung.thread.join;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Demonstrates Thread.join behavior.
|
||||
*
|
||||
*/
|
||||
public class ThreadJoinUnitTest {
|
||||
|
||||
final static Logger LOGGER = Logger.getLogger(ThreadJoinUnitTest.class.getName());
|
||||
|
||||
class SampleThread extends Thread {
|
||||
public int processingCount = 0;
|
||||
|
||||
SampleThread(int processingCount) {
|
||||
this.processingCount = processingCount;
|
||||
LOGGER.info("Thread " + this.getName() + " created");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LOGGER.info("Thread " + this.getName() + " started");
|
||||
while (processingCount > 0) {
|
||||
try {
|
||||
Thread.sleep(1000); // Simulate some work being done by thread
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.info("Thread " + this.getName() + " interrupted.");
|
||||
}
|
||||
processingCount--;
|
||||
LOGGER.info("Inside Thread " + this.getName() + ", processingCount = " + processingCount);
|
||||
}
|
||||
LOGGER.info("Thread " + this.getName() + " exiting");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenNewThread_whenJoinCalled_returnsImmediately() throws InterruptedException {
|
||||
Thread t1 = new SampleThread(0);
|
||||
LOGGER.info("Invoking join.");
|
||||
t1.join();
|
||||
LOGGER.info("Returned from join");
|
||||
LOGGER.info("Thread state is" + t1.getState());
|
||||
assertFalse(t1.isAlive());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenStartedThread_whenJoinCalled_waitsTillCompletion()
|
||||
throws InterruptedException {
|
||||
Thread t2 = new SampleThread(1);
|
||||
t2.start();
|
||||
LOGGER.info("Invoking join.");
|
||||
t2.join();
|
||||
LOGGER.info("Returned from join");
|
||||
assertFalse(t2.isAlive());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenStartedThread_whenTimedJoinCalled_waitsUntilTimedout()
|
||||
throws InterruptedException {
|
||||
Thread t3 = new SampleThread(10);
|
||||
t3.start();
|
||||
t3.join(1000);
|
||||
assertTrue(t3.isAlive());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void givenThreadTerminated_checkForEffect_notGuaranteed()
|
||||
throws InterruptedException {
|
||||
SampleThread t4 = new SampleThread(10);
|
||||
t4.start();
|
||||
//not guaranteed to stop even if t4 finishes.
|
||||
do {
|
||||
|
||||
} while (t4.processingCount > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenJoinWithTerminatedThread_checkForEffect_guaranteed()
|
||||
throws InterruptedException {
|
||||
SampleThread t4 = new SampleThread(10);
|
||||
t4.start();
|
||||
do {
|
||||
t4.join(100);
|
||||
} while (t4.processingCount > 0);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.baeldung.threadlocal;
|
||||
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class ThreadLocalIntegrationTest {
|
||||
@Test
|
||||
public void givenThreadThatStoresContextInAMap_whenStartThread_thenShouldSetContextForBothUsers() throws ExecutionException, InterruptedException {
|
||||
//when
|
||||
SharedMapWithUserContext firstUser = new SharedMapWithUserContext(1);
|
||||
SharedMapWithUserContext secondUser = new SharedMapWithUserContext(2);
|
||||
new Thread(firstUser).start();
|
||||
new Thread(secondUser).start();
|
||||
|
||||
Thread.sleep(3000);
|
||||
//then
|
||||
assertEquals(SharedMapWithUserContext.userContextPerUserId.size(), 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenThreadThatStoresContextInThreadLocal_whenStartThread_thenShouldStoreContextInThreadLocal() throws ExecutionException, InterruptedException {
|
||||
//when
|
||||
ThreadLocalWithUserContext firstUser = new ThreadLocalWithUserContext(1);
|
||||
ThreadLocalWithUserContext secondUser = new ThreadLocalWithUserContext(2);
|
||||
new Thread(firstUser).start();
|
||||
new Thread(secondUser).start();
|
||||
|
||||
Thread.sleep(3000);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
package com.baeldung.threadlocalrandom;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ThreadLocalRandomIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void givenUsingThreadLocalRandom_whenGeneratingRandomIntBounded_thenCorrect() {
|
||||
int leftLimit = 1;
|
||||
int rightLimit = 100;
|
||||
int generatedInt = ThreadLocalRandom.current().nextInt(leftLimit, rightLimit);
|
||||
|
||||
assertTrue(generatedInt < rightLimit && generatedInt >= leftLimit);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenUsingThreadLocalRandom_whenGeneratingRandomIntUnbounded_thenCorrect() {
|
||||
int generatedInt = ThreadLocalRandom.current().nextInt();
|
||||
|
||||
assertTrue(generatedInt < Integer.MAX_VALUE && generatedInt >= Integer.MIN_VALUE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenUsingThreadLocalRandom_whenGeneratingRandomLongBounded_thenCorrect() {
|
||||
long leftLimit = 1L;
|
||||
long rightLimit = 100L;
|
||||
long generatedLong = ThreadLocalRandom.current().nextLong(leftLimit, rightLimit);
|
||||
|
||||
assertTrue(generatedLong < rightLimit && generatedLong >= leftLimit);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenUsingThreadLocalRandom_whenGeneratingRandomLongUnbounded_thenCorrect() {
|
||||
long generatedInt = ThreadLocalRandom.current().nextLong();
|
||||
|
||||
assertTrue(generatedInt < Long.MAX_VALUE && generatedInt >= Long.MIN_VALUE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenUsingThreadLocalRandom_whenGeneratingRandomDoubleBounded_thenCorrect() {
|
||||
double leftLimit = 1D;
|
||||
double rightLimit = 100D;
|
||||
double generatedInt = ThreadLocalRandom.current().nextDouble(leftLimit, rightLimit);
|
||||
|
||||
assertTrue(generatedInt < rightLimit && generatedInt >= leftLimit);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenUsingThreadLocalRandom_whenGeneratingRandomDoubleUnbounded_thenCorrect() {
|
||||
double generatedInt = ThreadLocalRandom.current().nextDouble();
|
||||
|
||||
assertTrue(generatedInt < Double.MAX_VALUE && generatedInt >= Double.MIN_VALUE);
|
||||
}
|
||||
|
||||
@Test(expected = UnsupportedOperationException.class)
|
||||
public void givenUsingThreadLocalRandom_whenSettingSeed_thenThrowUnsupportedOperationException() {
|
||||
ThreadLocalRandom.current().setSeed(0l);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,147 @@
|
||||
package com.baeldung.threadpool;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class CoreThreadPoolIntegrationTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(CoreThreadPoolIntegrationTest.class);
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void whenCallingExecuteWithRunnable_thenRunnableIsExecuted() throws InterruptedException {
|
||||
|
||||
CountDownLatch lock = new CountDownLatch(1);
|
||||
|
||||
Executor executor = Executors.newSingleThreadExecutor();
|
||||
executor.execute(() -> {
|
||||
LOG.debug("Hello World");
|
||||
lock.countDown();
|
||||
});
|
||||
|
||||
lock.await(1000, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUsingExecutorServiceAndFuture_thenCanWaitOnFutureResult() throws InterruptedException, ExecutionException {
|
||||
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(10);
|
||||
Future<String> future = executorService.submit(() -> "Hello World");
|
||||
String result = future.get();
|
||||
|
||||
assertEquals("Hello World", result);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUsingFixedThreadPool_thenCoreAndMaximumThreadSizeAreTheSame() {
|
||||
|
||||
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
|
||||
executor.submit(() -> {
|
||||
Thread.sleep(1000);
|
||||
return null;
|
||||
});
|
||||
executor.submit(() -> {
|
||||
Thread.sleep(1000);
|
||||
return null;
|
||||
});
|
||||
executor.submit(() -> {
|
||||
Thread.sleep(1000);
|
||||
return null;
|
||||
});
|
||||
|
||||
assertEquals(2, executor.getPoolSize());
|
||||
assertEquals(1, executor.getQueue().size());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUsingCachedThreadPool_thenPoolSizeGrowsUnbounded() {
|
||||
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
|
||||
executor.submit(() -> {
|
||||
Thread.sleep(1000);
|
||||
return null;
|
||||
});
|
||||
executor.submit(() -> {
|
||||
Thread.sleep(1000);
|
||||
return null;
|
||||
});
|
||||
executor.submit(() -> {
|
||||
Thread.sleep(1000);
|
||||
return null;
|
||||
});
|
||||
|
||||
assertEquals(3, executor.getPoolSize());
|
||||
assertEquals(0, executor.getQueue().size());
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void whenUsingSingleThreadPool_thenTasksExecuteSequentially() throws InterruptedException {
|
||||
|
||||
CountDownLatch lock = new CountDownLatch(2);
|
||||
AtomicInteger counter = new AtomicInteger();
|
||||
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
executor.submit(() -> {
|
||||
counter.set(1);
|
||||
lock.countDown();
|
||||
});
|
||||
executor.submit(() -> {
|
||||
counter.compareAndSet(1, 2);
|
||||
lock.countDown();
|
||||
});
|
||||
|
||||
lock.await(1000, TimeUnit.MILLISECONDS);
|
||||
assertEquals(2, counter.get());
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void whenSchedulingTask_thenTaskExecutesWithinGivenPeriod() throws InterruptedException {
|
||||
|
||||
CountDownLatch lock = new CountDownLatch(1);
|
||||
|
||||
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
|
||||
executor.schedule(() -> {
|
||||
LOG.debug("Hello World");
|
||||
lock.countDown();
|
||||
}, 500, TimeUnit.MILLISECONDS);
|
||||
|
||||
lock.await(1000, TimeUnit.MILLISECONDS);
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void whenSchedulingTaskWithFixedPeriod_thenTaskExecutesMultipleTimes() throws InterruptedException {
|
||||
|
||||
CountDownLatch lock = new CountDownLatch(3);
|
||||
|
||||
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
|
||||
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
|
||||
LOG.debug("Hello World");
|
||||
lock.countDown();
|
||||
}, 500, 100, TimeUnit.MILLISECONDS);
|
||||
|
||||
lock.await();
|
||||
future.cancel(true);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenUsingForkJoinPool_thenSumOfTreeElementsIsCalculatedCorrectly() {
|
||||
|
||||
TreeNode tree = new TreeNode(5, new TreeNode(3), new TreeNode(2, new TreeNode(2), new TreeNode(8)));
|
||||
|
||||
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
|
||||
int sum = forkJoinPool.invoke(new CountingTask(tree));
|
||||
|
||||
assertEquals(20, sum);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
package com.baeldung.threadpool;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class GuavaThreadPoolIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void whenExecutingTaskWithDirectExecutor_thenTheTaskIsExecutedInTheCurrentThread() {
|
||||
|
||||
Executor executor = MoreExecutors.directExecutor();
|
||||
|
||||
AtomicBoolean executed = new AtomicBoolean();
|
||||
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
executed.set(true);
|
||||
});
|
||||
|
||||
assertTrue(executed.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenJoiningFuturesWithAllAsList_thenCombinedFutureCompletesAfterAllFuturesComplete() throws ExecutionException, InterruptedException {
|
||||
|
||||
ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService);
|
||||
|
||||
ListenableFuture<String> future1 = listeningExecutorService.submit(() -> "Hello");
|
||||
ListenableFuture<String> future2 = listeningExecutorService.submit(() -> "World");
|
||||
|
||||
String greeting = Futures.allAsList(future1, future2).get().stream().collect(Collectors.joining(" "));
|
||||
assertEquals("Hello World", greeting);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
13
core-java-modules/core-java-concurrency-advanced/src/test/resources/.gitignore
vendored
Normal file
13
core-java-modules/core-java-concurrency-advanced/src/test/resources/.gitignore
vendored
Normal file
@@ -0,0 +1,13 @@
|
||||
*.class
|
||||
|
||||
#folders#
|
||||
/target
|
||||
/neoDb*
|
||||
/data
|
||||
/src/main/webapp/WEB-INF/classes
|
||||
*/META-INF/*
|
||||
|
||||
# Packaged files #
|
||||
*.jar
|
||||
*.war
|
||||
*.ear
|
||||
Reference in New Issue
Block a user