BAEL-519 Disruptor in its own module. (#1017)
* BAL-36 File size api in java and apache commons IO * BAEL-282 grep in java - fixes after code review * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor * BAEL-519 Moved all supporting classes to main source * BAEL-519 Moved all supporting classes to main source * BAEL-519 Moved asserts and test classes in test folder. * BAEL-519 moved test related producer and consumer to src. * BAEL-586 Guide to Guava BiMap. * BAEL-587 formatted code. * BAEL-519 LMAX Disruptor * BAEL-587 resolved merge * BAEL-587 Resolved merge * BAEL-519 Removed disruptor link. * BAEL-519 Reverted Guava changes * RFQ-587 Added disruptor as a separate module. * BAEL-519 Disruptor changes. * BAEL-519 Removed disruptor from core-java module.
This commit is contained in:
0
disruptor/README.md
Normal file
0
disruptor/README.md
Normal file
251
disruptor/pom.xml
Normal file
251
disruptor/pom.xml
Normal file
@@ -0,0 +1,251 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>disruptor</artifactId>
|
||||
<version>0.1.0-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>disruptor</name>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<!-- utils -->
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<version>${commons-lang3.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.lmax</groupId>
|
||||
<artifactId>disruptor</artifactId>
|
||||
<version>${disruptor.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- logging -->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>${org.slf4j.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>${logback.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>jcl-over-slf4j</artifactId>
|
||||
<version>${org.slf4j.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<version>${org.slf4j.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- test scoped -->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-all</artifactId>
|
||||
<version>1.3</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>${junit.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-core</artifactId>
|
||||
<version>${org.hamcrest.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-library</artifactId>
|
||||
<version>${org.hamcrest.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<finalName>disruptor</finalName>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>src/main/resources</directory>
|
||||
<filtering>true</filtering>
|
||||
</resource>
|
||||
</resources>
|
||||
|
||||
<plugins>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>${maven-compiler-plugin.version}</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>${maven-surefire-plugin.version}</version>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-dependencies</id>
|
||||
<phase>prepare-package</phase>
|
||||
<goals>
|
||||
<goal>copy-dependencies</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/libs</outputDirectory>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifest>
|
||||
<addClasspath>true</addClasspath>
|
||||
<classpathPrefix>libs/</classpathPrefix>
|
||||
<mainClass>org.baeldung.executable.ExecutableMavenJar</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifest>
|
||||
<mainClass>org.baeldung.executable.ExecutableMavenJar</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
<descriptorRefs>
|
||||
<descriptorRef>jar-with-dependencies</descriptorRef>
|
||||
</descriptorRefs>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<shadedArtifactAttached>true</shadedArtifactAttached>
|
||||
<transformers>
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||
<mainClass>org.baeldung.executable.ExecutableMavenJar</mainClass>
|
||||
</transformer>
|
||||
</transformers>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>com.jolira</groupId>
|
||||
<artifactId>onejar-maven-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<configuration>
|
||||
<mainClass>org.baeldung.executable.ExecutableMavenJar</mainClass>
|
||||
<attachToBuild>true</attachToBuild>
|
||||
<filename>${project.build.finalName}-onejar.${project.packaging}</filename>
|
||||
</configuration>
|
||||
<goals>
|
||||
<goal>one-jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
</plugins>
|
||||
|
||||
</build>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>integration</id>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>integration-test</phase>
|
||||
<goals>
|
||||
<goal>test</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<systemPropertyVariables>
|
||||
<test.mime>json</test.mime>
|
||||
</systemPropertyVariables>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
</profiles>
|
||||
|
||||
<properties>
|
||||
<!-- logging -->
|
||||
<org.slf4j.version>1.7.21</org.slf4j.version>
|
||||
<logback.version>1.1.7</logback.version>
|
||||
|
||||
<!-- util -->
|
||||
<commons-lang3.version>3.5</commons-lang3.version>
|
||||
<disruptor.version>3.3.6</disruptor.version>
|
||||
|
||||
<!-- testing -->
|
||||
<org.hamcrest.version>1.3</org.hamcrest.version>
|
||||
<junit.version>4.12</junit.version>
|
||||
<mockito.version>1.10.19</mockito.version>
|
||||
<testng.version>6.10</testng.version>
|
||||
<assertj.version>3.6.1</assertj.version>
|
||||
|
||||
<!-- maven plugins -->
|
||||
<maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version>
|
||||
<maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version>
|
||||
|
||||
</properties>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
|
||||
public class DelayedMultiEventProducer implements EventProducer {
|
||||
|
||||
@Override
|
||||
public void startProducing(final RingBuffer<ValueEvent> ringBuffer, final int count) {
|
||||
final Runnable simpleProducer = () -> produce(ringBuffer, count, false);
|
||||
final Runnable delayedProducer = () -> produce(ringBuffer, count, true);
|
||||
new Thread(simpleProducer).start();
|
||||
new Thread(delayedProducer).start();
|
||||
}
|
||||
|
||||
private void produce(final RingBuffer<ValueEvent> ringBuffer, final int count, final boolean addDelay) {
|
||||
for (int i = 0; i < count; i++) {
|
||||
final long seq = ringBuffer.next();
|
||||
final ValueEvent valueEvent = ringBuffer.get(seq);
|
||||
valueEvent.setValue(i);
|
||||
ringBuffer.publish(seq);
|
||||
if (addDelay) {
|
||||
addDelay();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void addDelay() {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException interruptedException) {
|
||||
// No-Op lets swallow it
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
|
||||
/**
|
||||
* Consumer that consumes event from ring buffer.
|
||||
*/
|
||||
public interface EventConsumer {
|
||||
/**
|
||||
* One or more event handler to handle event from ring buffer.
|
||||
*/
|
||||
public EventHandler<ValueEvent>[] getEventHandler();
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
|
||||
/**
|
||||
* Producer that produces event for ring buffer.
|
||||
*/
|
||||
public interface EventProducer {
|
||||
/**
|
||||
* Start the producer that would start producing the values.
|
||||
* @param ringBuffer
|
||||
* @param count
|
||||
*/
|
||||
public void startProducing(final RingBuffer<ValueEvent> ringBuffer, final int count);
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
|
||||
public class MultiEventPrintConsumer implements EventConsumer {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public EventHandler<ValueEvent>[] getEventHandler() {
|
||||
final EventHandler<ValueEvent> eventHandler = (event, sequence, endOfBatch) -> print(event.getValue(), sequence);
|
||||
final EventHandler<ValueEvent> otherEventHandler = (event, sequence, endOfBatch) -> print(event.getValue(), sequence);
|
||||
return new EventHandler[] { eventHandler, otherEventHandler };
|
||||
}
|
||||
|
||||
private void print(final int id, final long sequenceId) {
|
||||
logger.info("Id is " + id + " sequence id that was used is " + sequenceId);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
|
||||
public class SingleEventPrintConsumer implements EventConsumer {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public EventHandler<ValueEvent>[] getEventHandler() {
|
||||
final EventHandler<ValueEvent> eventHandler = (event, sequence, endOfBatch) -> print(event.getValue(), sequence);
|
||||
return new EventHandler[] { eventHandler };
|
||||
}
|
||||
|
||||
private void print(final int id, final long sequenceId) {
|
||||
logger.info("Id is " + id + " sequence id that was used is " + sequenceId);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
|
||||
public class SingleEventProducer implements EventProducer {
|
||||
|
||||
@Override
|
||||
public void startProducing(RingBuffer<ValueEvent> ringBuffer, int count) {
|
||||
final Runnable producer = () -> produce(ringBuffer, count);
|
||||
new Thread(producer).start();
|
||||
}
|
||||
|
||||
private void produce(final RingBuffer<ValueEvent> ringBuffer, final int count) {
|
||||
for (int i = 0; i < count; i++) {
|
||||
final long seq = ringBuffer.next();
|
||||
final ValueEvent valueEvent = ringBuffer.get(seq);
|
||||
valueEvent.setValue(i);
|
||||
ringBuffer.publish(seq);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
|
||||
import com.lmax.disruptor.EventFactory;
|
||||
|
||||
public final class ValueEvent {
|
||||
|
||||
private int value;
|
||||
|
||||
public int getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public void setValue(int value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public final static EventFactory<ValueEvent> EVENT_FACTORY = () -> new ValueEvent();
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return ToStringBuilder.reflectionToString(this);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import com.lmax.disruptor.BusySpinWaitStrategy;
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
import com.lmax.disruptor.WaitStrategy;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
import com.lmax.disruptor.util.DaemonThreadFactory;
|
||||
|
||||
public class DisruptorTest {
|
||||
private Disruptor<ValueEvent> disruptor;
|
||||
private WaitStrategy waitStrategy;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
waitStrategy = new BusySpinWaitStrategy();
|
||||
}
|
||||
|
||||
private void createDisruptor(final ProducerType producerType, final EventConsumer eventConsumer) {
|
||||
final ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;
|
||||
disruptor = new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, 16, threadFactory, producerType, waitStrategy);
|
||||
disruptor.handleEventsWith(eventConsumer.getEventHandler());
|
||||
}
|
||||
|
||||
private void startProducing(final RingBuffer<ValueEvent> ringBuffer, final int count, final EventProducer eventProducer) {
|
||||
eventProducer.startProducing(ringBuffer, count);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenMultipleProducerSingleConsumer_thenOutputInFifoOrder() {
|
||||
final EventConsumer eventConsumer = new SingleEventPrintConsumer();
|
||||
final EventProducer eventProducer = new DelayedMultiEventProducer();
|
||||
createDisruptor(ProducerType.MULTI, eventConsumer);
|
||||
final RingBuffer<ValueEvent> ringBuffer = disruptor.start();
|
||||
|
||||
startProducing(ringBuffer, 32, eventProducer);
|
||||
|
||||
disruptor.halt();
|
||||
disruptor.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenSingleProducerSingleConsumer_thenOutputInFifoOrder() {
|
||||
final EventConsumer eventConsumer = new SingleEventConsumer();
|
||||
final EventProducer eventProducer = new SingleEventProducer();
|
||||
createDisruptor(ProducerType.SINGLE, eventConsumer);
|
||||
final RingBuffer<ValueEvent> ringBuffer = disruptor.start();
|
||||
|
||||
startProducing(ringBuffer, 32, eventProducer);
|
||||
|
||||
disruptor.halt();
|
||||
disruptor.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenSingleProducerMultipleConsumer_thenOutputInFifoOrder() {
|
||||
final EventConsumer eventConsumer = new MultiEventConsumer();
|
||||
final EventProducer eventProducer = new SingleEventProducer();
|
||||
createDisruptor(ProducerType.SINGLE, eventConsumer);
|
||||
final RingBuffer<ValueEvent> ringBuffer = disruptor.start();
|
||||
|
||||
startProducing(ringBuffer, 32, eventProducer);
|
||||
|
||||
disruptor.halt();
|
||||
disruptor.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenMultipleProducerMultipleConsumer_thenOutputInFifoOrder() {
|
||||
final EventConsumer eventConsumer = new MultiEventPrintConsumer();
|
||||
final EventProducer eventProducer = new DelayedMultiEventProducer();
|
||||
createDisruptor(ProducerType.MULTI, eventConsumer);
|
||||
final RingBuffer<ValueEvent> ringBuffer = disruptor.start();
|
||||
|
||||
startProducing(ringBuffer, 32, eventProducer);
|
||||
|
||||
disruptor.halt();
|
||||
disruptor.shutdown();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
|
||||
public class MultiEventConsumer implements EventConsumer {
|
||||
|
||||
private int expectedValue = -1;
|
||||
private int otherExpectedValue = -1;
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public EventHandler<ValueEvent>[] getEventHandler() {
|
||||
final EventHandler<ValueEvent> eventHandler = (event, sequence, endOfBatch) -> assertExpectedValue(event.getValue());
|
||||
final EventHandler<ValueEvent> otherEventHandler = (event, sequence, endOfBatch) -> assertOtherExpectedValue(event.getValue());
|
||||
return new EventHandler[] { eventHandler, otherEventHandler };
|
||||
}
|
||||
|
||||
private void assertExpectedValue(final int id) {
|
||||
assertEquals(++expectedValue, id);
|
||||
}
|
||||
|
||||
private void assertOtherExpectedValue(final int id) {
|
||||
assertEquals(++otherExpectedValue, id);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.baeldung.disruptor;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
|
||||
public class SingleEventConsumer implements EventConsumer {
|
||||
|
||||
private int expectedValue = -1;
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public EventHandler<ValueEvent>[] getEventHandler() {
|
||||
final EventHandler<ValueEvent> eventHandler = (event, sequence, endOfBatch) -> assertExpectedValue(event.getValue());
|
||||
return new EventHandler[] { eventHandler };
|
||||
}
|
||||
|
||||
private void assertExpectedValue(final int id) {
|
||||
assertEquals(++expectedValue, id);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user