moved semaphore examples from core-java-concurrency-advanced to core-java-concurrency-advanced-2
This commit is contained in:
@@ -3,24 +3,4 @@
|
||||
## Core Java Concurrency Advanced Examples
|
||||
|
||||
### Relevant Articles:
|
||||
- [Introduction to Thread Pools in Java](https://www.baeldung.com/thread-pool-java-and-guava)
|
||||
- [Guide to CountDownLatch in Java](https://www.baeldung.com/java-countdown-latch)
|
||||
- [Guide to java.util.concurrent.Locks](https://www.baeldung.com/java-concurrent-locks)
|
||||
- [An Introduction to ThreadLocal in Java](https://www.baeldung.com/java-threadlocal)
|
||||
- [LongAdder and LongAccumulator in Java](https://www.baeldung.com/java-longadder-and-longaccumulator)
|
||||
- [The Dining Philosophers Problem in Java](https://www.baeldung.com/java-dining-philoshophers)
|
||||
- [Guide to the Java Phaser](https://www.baeldung.com/java-phaser)
|
||||
- [An Introduction to Atomic Variables in Java](https://www.baeldung.com/java-atomic-variables)
|
||||
- [CyclicBarrier in Java](https://www.baeldung.com/java-cyclic-barrier)
|
||||
- [Guide to the Volatile Keyword in Java](https://www.baeldung.com/java-volatile)
|
||||
|
||||
- [Semaphores in Java](https://www.baeldung.com/java-semaphore)
|
||||
- [Daemon Threads in Java](https://www.baeldung.com/java-daemon-thread)
|
||||
- [Priority-based Job Scheduling in Java](https://www.baeldung.com/java-priority-job-schedule)
|
||||
- [Brief Introduction to Java Thread.yield()](https://www.baeldung.com/java-thread-yield)
|
||||
- [Print Even and Odd Numbers Using 2 Threads](https://www.baeldung.com/java-even-odd-numbers-with-2-threads)
|
||||
- [Java CyclicBarrier vs CountDownLatch](https://www.baeldung.com/java-cyclicbarrier-countdownlatch)
|
||||
- [Guide to the Fork/Join Framework in Java](https://www.baeldung.com/java-fork-join)
|
||||
- [Guide to ThreadLocalRandom in Java](https://www.baeldung.com/java-thread-local-random)
|
||||
- [The Thread.join() Method in Java](https://www.baeldung.com/java-thread-join)
|
||||
- [Passing Parameters to Java Threads](https://www.baeldung.com/java-thread-parameters)
|
||||
|
||||
@@ -15,6 +15,18 @@
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<version>${commons-lang3.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>${assertj.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
@@ -28,6 +40,7 @@
|
||||
</build>
|
||||
|
||||
<properties>
|
||||
<assertj.version>3.6.1</assertj.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
package com.baeldung.concurrent.semaphores;
|
||||
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
class CounterUsingMutex {
|
||||
|
||||
private final Semaphore mutex;
|
||||
private int count;
|
||||
|
||||
CounterUsingMutex() {
|
||||
mutex = new Semaphore(1);
|
||||
count = 0;
|
||||
}
|
||||
|
||||
void increase() throws InterruptedException {
|
||||
mutex.acquire();
|
||||
this.count = this.count + 1;
|
||||
Thread.sleep(1000);
|
||||
mutex.release();
|
||||
|
||||
}
|
||||
|
||||
int getCount() {
|
||||
return this.count;
|
||||
}
|
||||
|
||||
boolean hasQueuedThreads() {
|
||||
return mutex.hasQueuedThreads();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package com.baeldung.concurrent.semaphores;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.lang3.concurrent.TimedSemaphore;
|
||||
|
||||
class DelayQueueUsingTimedSemaphore {
|
||||
|
||||
private final TimedSemaphore semaphore;
|
||||
|
||||
DelayQueueUsingTimedSemaphore(long period, int slotLimit) {
|
||||
semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit);
|
||||
}
|
||||
|
||||
boolean tryAdd() {
|
||||
return semaphore.tryAcquire();
|
||||
}
|
||||
|
||||
int availableSlots() {
|
||||
return semaphore.getAvailablePermits();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
package com.baeldung.concurrent.semaphores;
|
||||
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
class LoginQueueUsingSemaphore {
|
||||
|
||||
private final Semaphore semaphore;
|
||||
|
||||
LoginQueueUsingSemaphore(int slotLimit) {
|
||||
semaphore = new Semaphore(slotLimit);
|
||||
}
|
||||
|
||||
boolean tryLogin() {
|
||||
return semaphore.tryAcquire();
|
||||
}
|
||||
|
||||
void logout() {
|
||||
semaphore.release();
|
||||
}
|
||||
|
||||
int availableSlots() {
|
||||
return semaphore.availablePermits();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,119 @@
|
||||
package com.baeldung.concurrent.semaphores;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class SemaphoresManualTest {
|
||||
|
||||
// ========= login queue ======
|
||||
|
||||
@Test
|
||||
public void givenLoginQueue_whenReachLimit_thenBlocked() throws InterruptedException {
|
||||
final int slots = 10;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
|
||||
final LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
|
||||
IntStream.range(0, slots)
|
||||
.forEach(user -> executorService.execute(loginQueue::tryLogin));
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
||||
|
||||
assertEquals(0, loginQueue.availableSlots());
|
||||
assertFalse(loginQueue.tryLogin());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenLoginQueue_whenLogout_thenSlotsAvailable() throws InterruptedException {
|
||||
final int slots = 10;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
|
||||
final LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
|
||||
IntStream.range(0, slots)
|
||||
.forEach(user -> executorService.execute(loginQueue::tryLogin));
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
||||
|
||||
assertEquals(0, loginQueue.availableSlots());
|
||||
loginQueue.logout();
|
||||
assertTrue(loginQueue.availableSlots() > 0);
|
||||
assertTrue(loginQueue.tryLogin());
|
||||
}
|
||||
|
||||
// ========= delay queue =======
|
||||
|
||||
@Test
|
||||
public void givenDelayQueue_whenReachLimit_thenBlocked() throws InterruptedException {
|
||||
final int slots = 50;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
|
||||
final DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
|
||||
IntStream.range(0, slots)
|
||||
.forEach(user -> executorService.execute(delayQueue::tryAdd));
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
||||
|
||||
assertEquals(0, delayQueue.availableSlots());
|
||||
assertFalse(delayQueue.tryAdd());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException {
|
||||
final int slots = 50;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
|
||||
final DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
|
||||
IntStream.range(0, slots)
|
||||
.forEach(user -> executorService.execute(delayQueue::tryAdd));
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
||||
|
||||
assertEquals(0, delayQueue.availableSlots());
|
||||
Thread.sleep(1000);
|
||||
assertTrue(delayQueue.availableSlots() > 0);
|
||||
assertTrue(delayQueue.tryAdd());
|
||||
}
|
||||
|
||||
// ========== mutex ========
|
||||
|
||||
@Test
|
||||
public void whenMutexAndMultipleThreads_thenBlocked() throws InterruptedException {
|
||||
final int count = 5;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(count);
|
||||
final CounterUsingMutex counter = new CounterUsingMutex();
|
||||
IntStream.range(0, count)
|
||||
.forEach(user -> executorService.execute(() -> {
|
||||
try {
|
||||
counter.increase();
|
||||
} catch (final InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}));
|
||||
executorService.shutdown();
|
||||
|
||||
assertTrue(counter.hasQueuedThreads());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount() throws InterruptedException {
|
||||
final int count = 5;
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(count);
|
||||
final CounterUsingMutex counter = new CounterUsingMutex();
|
||||
IntStream.range(0, count)
|
||||
.forEach(user -> executorService.execute(() -> {
|
||||
try {
|
||||
counter.increase();
|
||||
} catch (final InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}));
|
||||
executorService.shutdown();
|
||||
assertTrue(counter.hasQueuedThreads());
|
||||
Thread.sleep(5000);
|
||||
assertFalse(counter.hasQueuedThreads());
|
||||
assertEquals(count, counter.getCount());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user