diff --git a/libraries-7/README.md b/libraries-7/README.md new file mode 100644 index 0000000000..105a2ef16d --- /dev/null +++ b/libraries-7/README.md @@ -0,0 +1,11 @@ +## Libraries-7 + +This module contains articles about various Java libraries. +These are small libraries that are relatively easy to use and do not require any separate module of their own. + +The code examples related to different libraries are each in their own module. + +Remember, for advanced libraries like [Jackson](/jackson) and [JUnit](/testing-modules) we already have separate modules. Please make sure to have a look at the existing modules in such cases. + +### Relevant articles +- More articles [[<-- prev]](/libraries-6) diff --git a/libraries-7/pom.xml b/libraries-7/pom.xml new file mode 100644 index 0000000000..9bc6d2cf52 --- /dev/null +++ b/libraries-7/pom.xml @@ -0,0 +1,81 @@ + + + 4.0.0 + libraries-7 + + + parent-modules + com.baeldung + 1.0.0-SNAPSHOT + + + + + org.agrona + agrona + 1.17.1 + + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.6.0 + + + generate-sources + + java + + + + + false + true + uk.co.real_logic.sbe.SbeTool + + + sbe.output.dir + ${project.build.directory}/generated-sources/java + + + + ${project.basedir}/src/main/resources/schema.xml + + ${project.build.directory}/generated-sources/java + + + + uk.co.real-logic + sbe-tool + 1.27.0 + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.0.0 + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/java/ + + + + + + + + + diff --git a/libraries-7/src/main/java/com/baeldung/sbe/MarketData.java b/libraries-7/src/main/java/com/baeldung/sbe/MarketData.java new file mode 100644 index 0000000000..2aaa30608d --- /dev/null +++ b/libraries-7/src/main/java/com/baeldung/sbe/MarketData.java @@ -0,0 +1,98 @@ +package com.baeldung.sbe; + +import java.util.StringJoiner; + +import com.baeldung.sbe.stub.Currency; +import com.baeldung.sbe.stub.Market; + +public class MarketData { + + private final int amount; + private final double price; + private final Market market; + private final Currency currency; + private final String symbol; + + public MarketData(int amount, double price, Market market, Currency currency, String symbol) { + this.amount = amount; + this.price = price; + this.market = market; + this.currency = currency; + this.symbol = symbol; + } + + public static class Builder { + private int amount; + + public Builder amount(int amount) { + this.amount = amount; + return this; + } + + private double price; + + public Builder price(double price) { + this.price = price; + return this; + } + + private Market market; + + public Builder market(Market market) { + this.market = market; + return this; + } + + private Currency currency; + + public Builder currency(Currency currency) { + this.currency = currency; + return this; + } + + private String symbol; + + public Builder symbol(String symbol) { + this.symbol = symbol; + return this; + } + + public MarketData build() { + return new MarketData(amount, price, market, currency, symbol); + } + } + + public static Builder builder() { + return new Builder(); + } + + public int getAmount() { + return amount; + } + + public double getPrice() { + return price; + } + + public Market getMarket() { + return market; + } + + public Currency getCurrency() { + return currency; + } + + public String getSymbol() { + return symbol; + } + + @Override + public String toString() { + return new StringJoiner(", ", MarketData.class.getSimpleName() + "[", "]").add("amount=" + amount) + .add("price=" + price) + .add("market=" + market) + .add("currency=" + currency) + .add("symbol='" + symbol + "'") + .toString(); + } +} diff --git a/libraries-7/src/main/java/com/baeldung/sbe/MarketDataSource.java b/libraries-7/src/main/java/com/baeldung/sbe/MarketDataSource.java new file mode 100644 index 0000000000..3cf7339f08 --- /dev/null +++ b/libraries-7/src/main/java/com/baeldung/sbe/MarketDataSource.java @@ -0,0 +1,48 @@ +package com.baeldung.sbe; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedList; + +import com.baeldung.sbe.stub.Currency; +import com.baeldung.sbe.stub.Market; + +public class MarketDataSource implements Iterator { + + private final LinkedList dataQueue = new LinkedList<>(); + + public MarketDataSource() { + // adding some test data into queue + this.dataQueue.addAll(Arrays.asList(MarketData.builder() + .amount(1) + .market(Market.NASDAQ) + .symbol("AAPL") + .price(134.12) + .currency(Currency.USD) + .build(), MarketData.builder() + .amount(2) + .market(Market.NYSE) + .symbol("IBM") + .price(128.99) + .currency(Currency.USD) + .build(), MarketData.builder() + .amount(1) + .market(Market.NASDAQ) + .symbol("AXP") + .price(34.87) + .currency(Currency.EUR) + .build())); + } + + @Override + public boolean hasNext() { + return !this.dataQueue.isEmpty(); + } + + @Override + public MarketData next() { + final MarketData data = this.dataQueue.pop(); + this.dataQueue.add(data); + return data; + } +} diff --git a/libraries-7/src/main/java/com/baeldung/sbe/MarketDataStreamServer.java b/libraries-7/src/main/java/com/baeldung/sbe/MarketDataStreamServer.java new file mode 100644 index 0000000000..a8b5809658 --- /dev/null +++ b/libraries-7/src/main/java/com/baeldung/sbe/MarketDataStreamServer.java @@ -0,0 +1,92 @@ +package com.baeldung.sbe; + +import java.nio.ByteBuffer; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MarketDataStreamServer { + + private static final Logger log = LoggerFactory.getLogger(MarketDataStreamServer.class); + + ByteBuffer buffer = ByteBuffer.allocate(128); + + final AtomicLong writePos = new AtomicLong(); + + ScheduledExecutorService writerThread = Executors.newScheduledThreadPool(1); + ScheduledExecutorService readerThreadPool = Executors.newScheduledThreadPool(2); + + private class Client { + + final String name; + final ByteBuffer readOnlyBuffer; + + final AtomicLong readPos = new AtomicLong(); + + Client(String name, ByteBuffer source) { + this.name = name; + this.readOnlyBuffer = source.asReadOnlyBuffer(); + } + + void readTradeData() { + while (readPos.get() < writePos.get()) { + try { + final int pos = this.readOnlyBuffer.position(); + final MarketData data = MarketDataUtil.readAndDecode(this.readOnlyBuffer); + readPos.addAndGet(this.readOnlyBuffer.position() - pos); + log.info(" client: {}, read/write gap: {}, data: {}", name, writePos.get() - readPos.get(), data); + } catch (IndexOutOfBoundsException e) { + this.readOnlyBuffer.clear(); // ring buffer + } catch (Exception e) { + log.error(" cannot read from buffer {}", readOnlyBuffer); + } + } + if (this.readOnlyBuffer.remaining() == 0) { + this.readOnlyBuffer.clear(); // ring buffer + } + } + + void read() { + readerThreadPool.scheduleAtFixedRate(this::readTradeData, 1, 1, TimeUnit.SECONDS); + } + } + + private Client newClient(String name) { + return new Client(name, buffer); + } + + private void writeTradeData(MarketData data) { + try { + final int writtenBytes = MarketDataUtil.encodeAndWrite(buffer, data); + writePos.addAndGet(writtenBytes); + log.info(" buffer size remaining: %{}, data: {}", 100 * buffer.remaining() / buffer.capacity(), data); + } catch (IndexOutOfBoundsException e) { + buffer.clear(); // ring buffer + writeTradeData(data); + } catch (Exception e) { + log.error(" cannot write into buffer {}", buffer); + } + } + + private void run(MarketDataSource source) { + writerThread.scheduleAtFixedRate(() -> { + if (source.hasNext()) { + writeTradeData(source.next()); + } + }, 1, 2, TimeUnit.SECONDS); + } + + public static void main(String[] args) { + MarketDataStreamServer server = new MarketDataStreamServer(); + Client client1 = server.newClient("client1"); + client1.read(); + Client client2 = server.newClient("client2"); + client2.read(); + server.run(new MarketDataSource()); + } + +} diff --git a/libraries-7/src/main/java/com/baeldung/sbe/MarketDataUtil.java b/libraries-7/src/main/java/com/baeldung/sbe/MarketDataUtil.java new file mode 100644 index 0000000000..f85173e786 --- /dev/null +++ b/libraries-7/src/main/java/com/baeldung/sbe/MarketDataUtil.java @@ -0,0 +1,78 @@ +package com.baeldung.sbe; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; + +import org.agrona.concurrent.UnsafeBuffer; + +import com.baeldung.sbe.stub.MessageHeaderDecoder; +import com.baeldung.sbe.stub.MessageHeaderEncoder; +import com.baeldung.sbe.stub.TradeDataDecoder; +import com.baeldung.sbe.stub.TradeDataEncoder; + +public class MarketDataUtil { + + public static int encodeAndWrite(ByteBuffer buffer, MarketData marketData) { + + final int pos = buffer.position(); + + final UnsafeBuffer directBuffer = new UnsafeBuffer(buffer); + final MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder(); + final TradeDataEncoder dataEncoder = new TradeDataEncoder(); + + final BigDecimal priceDecimal = BigDecimal.valueOf(marketData.getPrice()); + final int priceMantis = priceDecimal.scaleByPowerOfTen(priceDecimal.scale()) + .intValue(); + final int priceExponent = priceDecimal.scale() * -1; + + final TradeDataEncoder encoder = dataEncoder.wrapAndApplyHeader(directBuffer, pos, headerEncoder); + encoder.amount(marketData.getAmount()); + encoder.quote() + .market(marketData.getMarket()) + .currency(marketData.getCurrency()) + .symbol(marketData.getSymbol()) + .price() + .mantissa(priceMantis) + .exponent((byte) priceExponent); + + // set position + final int encodedLength = headerEncoder.encodedLength() + encoder.encodedLength(); + buffer.position(pos + encodedLength); + return encodedLength; + } + + public static MarketData readAndDecode(ByteBuffer buffer) { + + final int pos = buffer.position(); + + final UnsafeBuffer directBuffer = new UnsafeBuffer(buffer); + final MessageHeaderDecoder headerDecoder = new MessageHeaderDecoder(); + final TradeDataDecoder dataDecoder = new TradeDataDecoder(); + + dataDecoder.wrapAndApplyHeader(directBuffer, pos, headerDecoder); + + // set position + final int encodedLength = headerDecoder.encodedLength() + dataDecoder.encodedLength(); + buffer.position(pos + encodedLength); + + final double price = BigDecimal.valueOf(dataDecoder.quote() + .price() + .mantissa()) + .scaleByPowerOfTen(dataDecoder.quote() + .price() + .exponent()) + .doubleValue(); + + return MarketData.builder() + .amount(dataDecoder.amount()) + .symbol(dataDecoder.quote() + .symbol()) + .market(dataDecoder.quote() + .market()) + .currency(dataDecoder.quote() + .currency()) + .price(price) + .build(); + } + +} diff --git a/libraries-7/src/main/resources/schema.xml b/libraries-7/src/main/resources/schema.xml new file mode 100644 index 0000000000..010ccd276b --- /dev/null +++ b/libraries-7/src/main/resources/schema.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + 0 + 1 + + + + + + + + 0 + 1 + + + + + + + + + + + + + diff --git a/libraries-7/src/test/java/com/baeldung/test/EncodeDecodeMarketDataUnitTest.java b/libraries-7/src/test/java/com/baeldung/test/EncodeDecodeMarketDataUnitTest.java new file mode 100644 index 0000000000..5c6c5118a9 --- /dev/null +++ b/libraries-7/src/test/java/com/baeldung/test/EncodeDecodeMarketDataUnitTest.java @@ -0,0 +1,75 @@ +package com.baeldung.test; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; + +import org.agrona.concurrent.UnsafeBuffer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.baeldung.sbe.MarketData; +import com.baeldung.sbe.stub.Currency; +import com.baeldung.sbe.stub.Market; +import com.baeldung.sbe.stub.MessageHeaderDecoder; +import com.baeldung.sbe.stub.MessageHeaderEncoder; +import com.baeldung.sbe.stub.TradeDataDecoder; +import com.baeldung.sbe.stub.TradeDataEncoder; + +public class EncodeDecodeMarketDataUnitTest { + + private MarketData marketData; + + @BeforeEach + public void setup() { + marketData = new MarketData(2, 128.99, Market.NYSE, Currency.USD, "IBM"); + } + + @Test + public void givenMarketData_whenEncode_thenDecodedValuesMatch() { + // our buffer to write encoded data, initial cap. 128 bytes + UnsafeBuffer buffer = new UnsafeBuffer(ByteBuffer.allocate(128)); + // necessary encoders + MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder(); + TradeDataEncoder dataEncoder = new TradeDataEncoder(); + // we parse price data (double) into two parts: mantis and exponent + BigDecimal priceDecimal = BigDecimal.valueOf(marketData.getPrice()); + int priceMantissa = priceDecimal.scaleByPowerOfTen(priceDecimal.scale()) + .intValue(); + int priceExponent = priceDecimal.scale() * -1; + // encode data + TradeDataEncoder encoder = dataEncoder.wrapAndApplyHeader(buffer, 0, headerEncoder); + encoder.amount(marketData.getAmount()); + encoder.quote() + .market(marketData.getMarket()) + .currency(marketData.getCurrency()) + .symbol(marketData.getSymbol()) + .price() + .mantissa(priceMantissa) + .exponent((byte) priceExponent); + + // necessary decoders + MessageHeaderDecoder headerDecoder = new MessageHeaderDecoder(); + TradeDataDecoder dataDecoder = new TradeDataDecoder(); + // decode data + dataDecoder.wrapAndApplyHeader(buffer, 0, headerDecoder); + // decode price data (from mantissa and exponent) into a double + double price = BigDecimal.valueOf(dataDecoder.quote() + .price() + .mantissa()) + .scaleByPowerOfTen(dataDecoder.quote() + .price() + .exponent()) + .doubleValue(); + // ensure we have the exact same values + Assertions.assertEquals(2, dataDecoder.amount()); + Assertions.assertEquals("IBM", dataDecoder.quote() + .symbol()); + Assertions.assertEquals(Market.NYSE, dataDecoder.quote() + .market()); + Assertions.assertEquals(Currency.USD, dataDecoder.quote() + .currency()); + Assertions.assertEquals(128.99, price); + } + +} diff --git a/pom.xml b/pom.xml index 2817850b89..232459c32b 100644 --- a/pom.xml +++ b/pom.xml @@ -433,6 +433,7 @@ language-interop libraries-2 libraries-3 + libraries-7 libraries-apache-commons libraries-apache-commons-collections