Merge branch 'master' of https://github.com/eugenp/tutorials into BAEL-614
This commit is contained in:
@@ -0,0 +1,23 @@
|
||||
package com.baeldung.concurrent.countdownlatch;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class BrokenWorker implements Runnable {
|
||||
private final List<String> outputScraper;
|
||||
private final CountDownLatch countDownLatch;
|
||||
|
||||
public BrokenWorker(final List<String> outputScraper, final CountDownLatch countDownLatch) {
|
||||
this.outputScraper = outputScraper;
|
||||
this.countDownLatch = countDownLatch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (true) {
|
||||
throw new RuntimeException("Oh dear");
|
||||
}
|
||||
countDownLatch.countDown();
|
||||
outputScraper.add("Counted down");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.baeldung.concurrent.countdownlatch;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class WaitingWorker implements Runnable {
|
||||
|
||||
private final List<String> outputScraper;
|
||||
private final CountDownLatch readyThreadCounter;
|
||||
private final CountDownLatch callingThreadBlocker;
|
||||
private final CountDownLatch completedThreadCounter;
|
||||
|
||||
public WaitingWorker(final List<String> outputScraper,
|
||||
final CountDownLatch readyThreadCounter,
|
||||
final CountDownLatch callingThreadBlocker,
|
||||
CountDownLatch completedThreadCounter) {
|
||||
|
||||
this.outputScraper = outputScraper;
|
||||
this.readyThreadCounter = readyThreadCounter;
|
||||
this.callingThreadBlocker = callingThreadBlocker;
|
||||
this.completedThreadCounter = completedThreadCounter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
// Mark this thread as read / started
|
||||
readyThreadCounter.countDown();
|
||||
try {
|
||||
callingThreadBlocker.await();
|
||||
outputScraper.add("Counted down");
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
completedThreadCounter.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.baeldung.concurrent.countdownlatch;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class Worker implements Runnable {
|
||||
private final List<String> outputScraper;
|
||||
private final CountDownLatch countDownLatch;
|
||||
|
||||
public Worker(final List<String> outputScraper, final CountDownLatch countDownLatch) {
|
||||
this.outputScraper = outputScraper;
|
||||
this.countDownLatch = countDownLatch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
// Do some work
|
||||
System.out.println("Doing some logic");
|
||||
countDownLatch.countDown();
|
||||
outputScraper.add("Counted down");
|
||||
}
|
||||
}
|
||||
@@ -1,34 +0,0 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
|
||||
public class DelayedMultiEventProducer implements EventProducer {
|
||||
|
||||
@Override
|
||||
public void startProducing(final RingBuffer<ValueEvent> ringBuffer, final int count) {
|
||||
final Runnable simpleProducer = () -> produce(ringBuffer, count, false);
|
||||
final Runnable delayedProducer = () -> produce(ringBuffer, count, true);
|
||||
new Thread(simpleProducer).start();
|
||||
new Thread(delayedProducer).start();
|
||||
}
|
||||
|
||||
private void produce(final RingBuffer<ValueEvent> ringBuffer, final int count, final boolean addDelay) {
|
||||
for (int i = 0; i < count; i++) {
|
||||
final long seq = ringBuffer.next();
|
||||
final ValueEvent valueEvent = ringBuffer.get(seq);
|
||||
valueEvent.setValue(i);
|
||||
ringBuffer.publish(seq);
|
||||
if (addDelay) {
|
||||
addDelay();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void addDelay() {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException interruptedException) {
|
||||
// No-Op lets swallow it
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,13 +0,0 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
|
||||
/**
|
||||
* Consumer that consumes event from ring buffer.
|
||||
*/
|
||||
public interface EventConsumer {
|
||||
/**
|
||||
* One or more event handler to handle event from ring buffer.
|
||||
*/
|
||||
public EventHandler<ValueEvent>[] getEventHandler();
|
||||
}
|
||||
@@ -1,15 +0,0 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
|
||||
/**
|
||||
* Producer that produces event for ring buffer.
|
||||
*/
|
||||
public interface EventProducer {
|
||||
/**
|
||||
* Start the producer that would start producing the values.
|
||||
* @param ringBuffer
|
||||
* @param count
|
||||
*/
|
||||
public void startProducing(final RingBuffer<ValueEvent> ringBuffer, final int count);
|
||||
}
|
||||
@@ -1,23 +0,0 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
|
||||
public class MultiEventPrintConsumer implements EventConsumer {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public EventHandler<ValueEvent>[] getEventHandler() {
|
||||
final EventHandler<ValueEvent> eventHandler = (event, sequence, endOfBatch) -> print(event.getValue(), sequence);
|
||||
final EventHandler<ValueEvent> otherEventHandler = (event, sequence, endOfBatch) -> print(event.getValue(), sequence);
|
||||
return new EventHandler[] { eventHandler, otherEventHandler };
|
||||
}
|
||||
|
||||
private void print(final int id, final long sequenceId) {
|
||||
logger.info("Id is " + id + " sequence id that was used is " + sequenceId);
|
||||
}
|
||||
}
|
||||
@@ -1,22 +0,0 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
|
||||
public class SingleEventPrintConsumer implements EventConsumer {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public EventHandler<ValueEvent>[] getEventHandler() {
|
||||
final EventHandler<ValueEvent> eventHandler = (event, sequence, endOfBatch) -> print(event.getValue(), sequence);
|
||||
return new EventHandler[] { eventHandler };
|
||||
}
|
||||
|
||||
private void print(final int id, final long sequenceId) {
|
||||
logger.info("Id is " + id + " sequence id that was used is " + sequenceId);
|
||||
}
|
||||
}
|
||||
@@ -1,22 +0,0 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
|
||||
public class SingleEventProducer implements EventProducer {
|
||||
|
||||
@Override
|
||||
public void startProducing(RingBuffer<ValueEvent> ringBuffer, int count) {
|
||||
final Runnable producer = () -> produce(ringBuffer, count);
|
||||
new Thread(producer).start();
|
||||
}
|
||||
|
||||
private void produce(final RingBuffer<ValueEvent> ringBuffer, final int count) {
|
||||
for (int i = 0; i < count; i++) {
|
||||
final long seq = ringBuffer.next();
|
||||
final ValueEvent valueEvent = ringBuffer.get(seq);
|
||||
valueEvent.setValue(i);
|
||||
ringBuffer.publish(seq);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,25 +0,0 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
|
||||
import com.lmax.disruptor.EventFactory;
|
||||
|
||||
public final class ValueEvent {
|
||||
|
||||
private int value;
|
||||
|
||||
public int getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public void setValue(int value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public final static EventFactory<ValueEvent> EVENT_FACTORY = () -> new ValueEvent();
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return ToStringBuilder.reflectionToString(this);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user