Merge remote-tracking branch 'origin/master'
This commit is contained in:
@@ -0,0 +1,83 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import com.lmax.disruptor.BusySpinWaitStrategy;
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
import com.lmax.disruptor.WaitStrategy;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
import com.lmax.disruptor.util.DaemonThreadFactory;
|
||||
|
||||
public class DisruptorTest {
|
||||
private Disruptor<ValueEvent> disruptor;
|
||||
private WaitStrategy waitStrategy;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
waitStrategy = new BusySpinWaitStrategy();
|
||||
}
|
||||
|
||||
private void createDisruptor(final ProducerType producerType, final EventConsumer eventConsumer) {
|
||||
final ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;
|
||||
disruptor = new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, 16, threadFactory, producerType, waitStrategy);
|
||||
disruptor.handleEventsWith(eventConsumer.getEventHandler());
|
||||
}
|
||||
|
||||
private void startProducing(final RingBuffer<ValueEvent> ringBuffer, final int count, final EventProducer eventProducer) {
|
||||
eventProducer.startProducing(ringBuffer, count);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenMultipleProducerSingleConsumer_thenOutputInFifoOrder() {
|
||||
final EventConsumer eventConsumer = new SingleEventPrintConsumer();
|
||||
final EventProducer eventProducer = new DelayedMultiEventProducer();
|
||||
createDisruptor(ProducerType.MULTI, eventConsumer);
|
||||
final RingBuffer<ValueEvent> ringBuffer = disruptor.start();
|
||||
|
||||
startProducing(ringBuffer, 32, eventProducer);
|
||||
|
||||
disruptor.halt();
|
||||
disruptor.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenSingleProducerSingleConsumer_thenOutputInFifoOrder() {
|
||||
final EventConsumer eventConsumer = new SingleEventConsumer();
|
||||
final EventProducer eventProducer = new SingleEventProducer();
|
||||
createDisruptor(ProducerType.SINGLE, eventConsumer);
|
||||
final RingBuffer<ValueEvent> ringBuffer = disruptor.start();
|
||||
|
||||
startProducing(ringBuffer, 32, eventProducer);
|
||||
|
||||
disruptor.halt();
|
||||
disruptor.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenSingleProducerMultipleConsumer_thenOutputInFifoOrder() {
|
||||
final EventConsumer eventConsumer = new MultiEventConsumer();
|
||||
final EventProducer eventProducer = new SingleEventProducer();
|
||||
createDisruptor(ProducerType.SINGLE, eventConsumer);
|
||||
final RingBuffer<ValueEvent> ringBuffer = disruptor.start();
|
||||
|
||||
startProducing(ringBuffer, 32, eventProducer);
|
||||
|
||||
disruptor.halt();
|
||||
disruptor.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenMultipleProducerMultipleConsumer_thenOutputInFifoOrder() {
|
||||
final EventConsumer eventConsumer = new MultiEventPrintConsumer();
|
||||
final EventProducer eventProducer = new DelayedMultiEventProducer();
|
||||
createDisruptor(ProducerType.MULTI, eventConsumer);
|
||||
final RingBuffer<ValueEvent> ringBuffer = disruptor.start();
|
||||
|
||||
startProducing(ringBuffer, 32, eventProducer);
|
||||
|
||||
disruptor.halt();
|
||||
disruptor.shutdown();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
|
||||
public class MultiEventConsumer implements EventConsumer {
|
||||
|
||||
private int expectedValue = -1;
|
||||
private int otherExpectedValue = -1;
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public EventHandler<ValueEvent>[] getEventHandler() {
|
||||
final EventHandler<ValueEvent> eventHandler = (event, sequence, endOfBatch) -> assertExpectedValue(event.getValue());
|
||||
final EventHandler<ValueEvent> otherEventHandler = (event, sequence, endOfBatch) -> assertOtherExpectedValue(event.getValue());
|
||||
return new EventHandler[] { eventHandler, otherEventHandler };
|
||||
}
|
||||
|
||||
private void assertExpectedValue(final int id) {
|
||||
assertEquals(++expectedValue, id);
|
||||
}
|
||||
|
||||
private void assertOtherExpectedValue(final int id) {
|
||||
assertEquals(++otherExpectedValue, id);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
|
||||
public class SingleEventConsumer implements EventConsumer {
|
||||
|
||||
private int expectedValue = -1;
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public EventHandler<ValueEvent>[] getEventHandler() {
|
||||
final EventHandler<ValueEvent> eventHandler = (event, sequence, endOfBatch) -> assertExpectedValue(event.getValue());
|
||||
return new EventHandler[] { eventHandler };
|
||||
}
|
||||
|
||||
private void assertExpectedValue(final int id) {
|
||||
assertEquals(++expectedValue, id);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user