Merge branch 'master' of https://github.com/eugenp/tutorials into ABA_problem

This commit is contained in:
Gergo Petrik
2020-05-17 08:21:35 +02:00
769 changed files with 10047 additions and 2765 deletions

View File

@@ -11,4 +11,5 @@ This module contains articles about advanced topics about multithreading with co
- [Guide to Work Stealing in Java](https://www.baeldung.com/java-work-stealing)
- [Asynchronous Programming in Java](https://www.baeldung.com/java-asynchronous-programming)
- [Java Thread Deadlock and Livelock](https://www.baeldung.com/java-deadlock-livelock)
- [Guide to AtomicStampedReference in Java](https://www.baeldung.com/java-atomicstampedreference)
- [[<-- previous]](/core-java-modules/core-java-concurrency-advanced-2)

View File

@@ -0,0 +1,60 @@
package com.baeldung.deadlockAndLivelock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class DeadlockExample {
private Lock lock1 = new ReentrantLock(true);
private Lock lock2 = new ReentrantLock(true);
public static void main(String[] args) {
DeadlockExample deadlock = new DeadlockExample();
new Thread(deadlock::operation1, "T1").start();
new Thread(deadlock::operation2, "T2").start();
}
public void operation1() {
lock1.lock();
print("lock1 acquired, waiting to acquire lock2.");
sleep(50);
lock2.lock();
print("lock2 acquired");
print("executing first operation.");
lock2.unlock();
lock1.unlock();
}
public void operation2() {
lock2.lock();
print("lock2 acquired, waiting to acquire lock1.");
sleep(50);
lock1.lock();
print("lock1 acquired");
print("executing second operation.");
lock1.unlock();
lock2.unlock();
}
public void print(String message) {
System.out.println("Thread " + Thread.currentThread()
.getName() + ": " + message);
}
public void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@@ -0,0 +1,86 @@
package com.baeldung.deadlockAndLivelock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LivelockExample {
private Lock lock1 = new ReentrantLock(true);
private Lock lock2 = new ReentrantLock(true);
public static void main(String[] args) {
LivelockExample livelock = new LivelockExample();
new Thread(livelock::operation1, "T1").start();
new Thread(livelock::operation2, "T2").start();
}
public void operation1() {
while (true) {
tryLock(lock1, 50);
print("lock1 acquired, trying to acquire lock2.");
sleep(50);
if (tryLock(lock2)) {
print("lock2 acquired.");
} else {
print("cannot acquire lock2, releasing lock1.");
lock1.unlock();
continue;
}
print("executing first operation.");
break;
}
lock2.unlock();
lock1.unlock();
}
public void operation2() {
while (true) {
tryLock(lock2, 50);
print("lock2 acquired, trying to acquire lock1.");
sleep(50);
if (tryLock(lock1)) {
print("lock1 acquired.");
} else {
print("cannot acquire lock1, releasing lock2.");
lock2.unlock();
continue;
}
print("executing second operation.");
break;
}
lock1.unlock();
lock2.unlock();
}
public void print(String message) {
System.out.println("Thread " + Thread.currentThread()
.getName() + ": " + message);
}
public void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void tryLock(Lock lock, long millis) {
try {
lock.tryLock(10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public boolean tryLock(Lock lock) {
return lock.tryLock();
}
}

View File

@@ -0,0 +1,81 @@
package com.baeldung.lockfree;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class NonBlockingQueue<T> {
private final AtomicReference<Node<T>> head, tail;
private final AtomicInteger size;
public NonBlockingQueue() {
head = new AtomicReference<>(null);
tail = new AtomicReference<>(null);
size = new AtomicInteger();
size.set(0);
}
public void add(T element) {
if (element == null) {
throw new NullPointerException();
}
Node<T> node = new Node<>(element);
Node<T> currentTail;
do {
currentTail = tail.get();
node.setPrevious(currentTail);
} while(!tail.compareAndSet(currentTail, node));
if(node.previous != null) {
node.previous.next = node;
}
head.compareAndSet(null, node); //if we are inserting the first element
size.incrementAndGet();
}
public T get() {
if(head.get() == null) {
throw new NoSuchElementException();
}
Node<T> currentHead;
Node<T> nextNode;
do {
currentHead = head.get();
nextNode = currentHead.getNext();
} while(!head.compareAndSet(currentHead, nextNode));
size.decrementAndGet();
return currentHead.getValue();
}
public int size() {
return this.size.get();
}
private class Node<T> {
private final T value;
private volatile Node<T> next;
private volatile Node<T> previous;
public Node(T value) {
this.value = value;
this.next = null;
}
public T getValue() {
return value;
}
public Node<T> getNext() {
return next;
}
public void setPrevious(Node<T> previous) {
this.previous = previous;
}
}
}

View File

@@ -0,0 +1,83 @@
package com.baeldung.exchanger;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import org.junit.Test;
import static java.util.concurrent.CompletableFuture.runAsync;
public class ExchangerPipeLineManualTest {
private static final int BUFFER_SIZE = 100;
@Test
public void givenData_whenPassedThrough_thenCorrect() throws InterruptedException, ExecutionException {
Exchanger<Queue<String>> readerExchanger = new Exchanger<>();
Exchanger<Queue<String>> writerExchanger = new Exchanger<>();
int counter = 0;
Runnable reader = () -> {
Queue<String> readerBuffer = new ConcurrentLinkedQueue<>();
while (true) {
readerBuffer.add(UUID.randomUUID().toString());
if (readerBuffer.size() >= BUFFER_SIZE) {
try {
readerBuffer = readerExchanger.exchange(readerBuffer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
};
Runnable processor = () -> {
Queue<String> processorBuffer = new ConcurrentLinkedQueue<>();
Queue<String> writterBuffer = new ConcurrentLinkedQueue<>();
try {
processorBuffer = readerExchanger.exchange(processorBuffer);
while (true) {
writterBuffer.add(processorBuffer.poll());
if (processorBuffer.isEmpty()) {
try {
processorBuffer = readerExchanger.exchange(processorBuffer);
writterBuffer = writerExchanger.exchange(writterBuffer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
Runnable writer = () -> {
Queue<String> writterBuffer = new ConcurrentLinkedQueue<>();
try {
writterBuffer = writerExchanger.exchange(writterBuffer);
while (true) {
System.out.println(writterBuffer.poll());
if (writterBuffer.isEmpty()) {
writterBuffer = writerExchanger.exchange(writterBuffer);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture.allOf(runAsync(reader), runAsync(processor), runAsync(writer)).get();
}
}

View File

@@ -0,0 +1,63 @@
package com.baeldung.exchanger;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import org.junit.Test;
import static java.util.concurrent.CompletableFuture.runAsync;
public class ExchangerUnitTest {
@Test
public void givenThreads_whenMessageExchanged_thenCorrect() {
Exchanger<String> exchanger = new Exchanger<>();
Runnable taskA = () -> {
try {
String message = exchanger.exchange("from A");
assertEquals("from B", message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
Runnable taskB = () -> {
try {
String message = exchanger.exchange("from B");
assertEquals("from A", message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture.allOf(runAsync(taskA), runAsync(taskB)).join();
}
@Test
public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException, ExecutionException {
Exchanger<String> exchanger = new Exchanger<>();
Runnable runner = () -> {
try {
String message = exchanger.exchange("from runner");
assertEquals("to runner", message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture<Void> result = CompletableFuture.runAsync(runner);
String msg = exchanger.exchange("to runner");
assertEquals("from runner", msg);
result.join();
}
}