load(String aggregateId) {
+ return Mono.fromCallable(() -> aggregates.computeIfAbsent(aggregateId, (k) -> {
+ Book book = new Book(aggregateId);
+ //subscribe query projection for book events
+ book.aggregateEvents().flatMap(bookQueryRepository::updateProjection).subscribe();
+ return book;
+ }));
+ }
+}
diff --git a/src/main/java/io/bux/matchingengine/domain/bus/CommandBus.java b/src/main/java/io/bux/matchingengine/domain/bus/CommandBus.java
new file mode 100644
index 0000000..92519ee
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/domain/bus/CommandBus.java
@@ -0,0 +1,125 @@
+package io.bux.matchingengine.domain.bus;
+
+import io.bux.matchingengine.cqrs.Command;
+import io.bux.matchingengine.cqrs.SourcingEvent;
+import io.bux.matchingengine.domain.BookAggregateRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+import reactor.core.scheduler.Schedulers;
+
+import java.time.Duration;
+import java.util.function.Consumer;
+import javax.annotation.PreDestroy;
+
+/**
+ * Routes commands to corresponding aggregate.
+ *
+ * Routes command for district assets/aggregates in parallel, but routes commands withing one aggregate sequentially.
+ *
+ * Locking is done by Reactor {@see tryAcquire}
+ *
+ * Is fire and forget, canceling subscription will not change execution flow, but subscriber has option to "stay" and
+ * get signaled once corresponding event has been materialized or if execution has failed.
+ *
+ * @author Stefan Dragisic
+ */
+@Component
+public class CommandBus {
+
+ private final Logger logger = LoggerFactory.getLogger(CommandBus.class);
+
+ private final Sinks.Many commandExecutor = Sinks.many()
+ .unicast()
+ .onBackpressureBuffer();
+
+ private final Disposable commandExecutorDisposable;
+
+ /**
+ * Instantiate command bus by subscribing to hot stream on which commands are published
+ *
+ * @param aggregateRepository
+ */
+ public CommandBus(BookAggregateRepository aggregateRepository) {
+
+ commandExecutorDisposable = commandExecutor.asFlux()
+ .doOnNext(n -> logger.debug("{} being executed....",
+ n.getCommand().getClass()
+ .getSimpleName()))
+ .groupBy(cw -> cw.getCommand().aggregateId())
+ .flatMap(aggregateCommands -> aggregateCommands
+ .concatMap(cmd -> aggregateRepository
+ .load(cmd.getCommand().aggregateId())
+ .flatMap(aggregate -> aggregate.routeCommand(cmd.getCommand()) //potential performance boost - flatMapSequential
+ .flatMap(event -> aggregate.routeEvent(
+ event)
+ .then(cmd.signalMaterialized(
+ event)))
+ .doOnError(cmd::signalError)
+
+
+ )))
+ .subscribe();
+ }
+
+ /**
+ * Sends a command that is then routed to command handler at corresponding aggregate. Routes command for district
+ * assets/aggregates in parallel, but routes commands withing one aggregate sequentially.
+ *
+ * Is fire and forget - canceling execution does not change execution flow.
+ *
+ * @param command to send
+ * @return sourcing event once it has been materialized
+ */
+ public Mono sendCommand(Command command) {
+ return Mono.defer(() -> {
+ Sinks.One actionResult = Sinks.one();
+ return Mono.fromRunnable(() -> commandExecutor.emitNext(new CommandWrapper(command,
+ actionResult::tryEmitValue,
+ actionResult::tryEmitError),
+ (signalType, emitResult) -> emitResult
+ .equals(Sinks.EmitResult.FAIL_NON_SERIALIZED)))
+ .subscribeOn(Schedulers.parallel())
+ .then(actionResult.asMono());
+ });
+ }
+
+ /**
+ * Shutdown command bus on bean destruction
+ */
+ @PreDestroy
+ public void destroy() {
+ commandExecutor.tryEmitComplete();
+ commandExecutorDisposable.dispose();
+ }
+
+ private static class CommandWrapper {
+
+ private final Command command;
+ private final Consumer signalDone;
+ private final Consumer signalError;
+
+ public CommandWrapper(Command command,
+ Consumer action,
+ Consumer signalError) {
+ this.command = command;
+ this.signalDone = action;
+ this.signalError = signalError;
+ }
+
+ public Command getCommand() {
+ return command;
+ }
+
+ public Mono signalMaterialized(SourcingEvent event) {
+ return Mono.fromRunnable(() -> signalDone.accept(event));
+ }
+
+ public void signalError(Throwable t) {
+ signalError.accept(t);
+ }
+ }
+}
diff --git a/src/main/java/io/bux/matchingengine/domain/command/CancelOrderCommand.java b/src/main/java/io/bux/matchingengine/domain/command/CancelOrderCommand.java
new file mode 100644
index 0000000..4617002
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/domain/command/CancelOrderCommand.java
@@ -0,0 +1,28 @@
+package io.bux.matchingengine.domain.command;
+
+import io.bux.matchingengine.cqrs.Command;
+import org.springframework.lang.NonNull;
+
+import java.math.BigDecimal;
+import java.util.UUID;
+
+/**
+ * Command to request order cancellation
+ * @author Stefan Dragisic
+ */
+public record CancelOrderCommand(String aggregateId, UUID commandId, long orderId, Boolean cancelAll, BigDecimal newAmount)
+ implements Command {
+
+ public CancelOrderCommand(
+ @NonNull String aggregateId,
+ @NonNull UUID commandId,
+ @NonNull long orderId,
+ @NonNull Boolean cancelAll,
+ @NonNull BigDecimal newAmount) {
+ this.aggregateId = aggregateId;
+ this.commandId = commandId;
+ this.orderId = orderId;
+ this.cancelAll = cancelAll;
+ this.newAmount = newAmount;
+ }
+}
diff --git a/src/main/java/io/bux/matchingengine/domain/command/MakeOrderCommand.java b/src/main/java/io/bux/matchingengine/domain/command/MakeOrderCommand.java
new file mode 100644
index 0000000..1c8ccad
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/domain/command/MakeOrderCommand.java
@@ -0,0 +1,31 @@
+package io.bux.matchingengine.domain.command;
+
+import io.bux.matchingengine.cqrs.Command;
+import io.bux.matchingengine.domain.query.OrderType;
+import org.springframework.lang.NonNull;
+
+import java.math.BigDecimal;
+import java.util.UUID;
+
+/**
+ * Command to place new order
+ *
+ * @author Stefan Dragisic
+ */
+public record MakeOrderCommand(String aggregateId, UUID commandId,
+ OrderType type, BigDecimal amount, BigDecimal price)
+ implements Command {
+
+ public MakeOrderCommand(
+ @NonNull String aggregateId,
+ @NonNull UUID commandId,
+ @NonNull OrderType type,
+ @NonNull BigDecimal amount,
+ @NonNull BigDecimal price) {
+ this.aggregateId = aggregateId;
+ this.commandId = commandId;
+ this.type = type;
+ this.amount = amount;
+ this.price = price;
+ }
+}
diff --git a/src/main/java/io/bux/matchingengine/domain/engine/MatchingEngine.java b/src/main/java/io/bux/matchingengine/domain/engine/MatchingEngine.java
new file mode 100644
index 0000000..c9c1710
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/domain/engine/MatchingEngine.java
@@ -0,0 +1,274 @@
+package io.bux.matchingengine.domain.engine;
+
+import io.bux.matchingengine.cqrs.UpdateEvent;
+import io.bux.matchingengine.domain.query.OrderType;
+import io.bux.matchingengine.domain.engine.events.OrderCanceledEvent;
+import io.bux.matchingengine.domain.engine.events.OrderMatchedEvent;
+import io.bux.matchingengine.domain.engine.events.OrderPlacedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Sinks;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Single threaded matching engine - any thread synchronization should be done external.
+ *
+ * Uses Max-Heap and Min-Heap {@link TreeSet}
+ *
+ * Time complexity for critical operations are as.
+ * - Add – O(log N)
+ * - Cancel – O(1)
+ *
+ * Asynchronous - no immediate return values, all events are publishes to local event bus {@link
+ * MatchingEngine#engineEventSink}
+ *
+ * @author Stefan Dragisic
+ */
+public class MatchingEngine {
+
+ private final Logger logger = LoggerFactory.getLogger(MatchingEngine.class);
+ private final TreeSet bids;
+ private final TreeSet asks;
+ private final Map orders;
+ private final AtomicLong term;
+ Sinks.Many engineEventSink = Sinks.many().multicast().onBackpressureBuffer();
+ Flux engineEventFlux = engineEventSink.asFlux()
+ .doOnNext(n -> logger.info(n.toString()))
+ .publish()
+ .autoConnect();
+
+ public MatchingEngine() {
+ this.bids = new TreeSet<>(MatchingEngine::compareBids);
+ this.asks = new TreeSet<>(MatchingEngine::compareAsks);
+
+ this.orders = new HashMap<>();
+ this.term = new AtomicLong(0);
+ }
+
+ private static int compareBids(Order a, Order b) {
+ int result = b.getPrice().compareTo(a.getPrice());
+ if (result != 0) {
+ return result;
+ }
+
+ return Long.compare(a.getTerm(), b.getTerm());
+ }
+
+ private static int compareAsks(Order a, Order b) {
+ int result = a.getPrice().compareTo(b.getPrice());
+ if (result != 0) {
+ return result;
+ }
+
+ return Long.compare(a.getTerm(), b.getTerm());
+ }
+
+ /**
+ * All engine execution events are published to this bus
+ *
+ * @return local engine event bus - hot stream
+ */
+ public Flux engineEvents() {
+ return engineEventFlux;
+ }
+
+ /**
+ * Places order into matching engine
+ *
+ * @param orderId - order identifier
+ * @param aggregateId - asset name / aggregate identifier
+ * @param entryTimestamp - time when the system registered order
+ * @param type - direction - can be either "BUY" or "SELL"
+ * @param price - a price for limit order
+ * @param amount - amount of asset to fill by order
+ */
+ public void placeOrder(long orderId, String aggregateId, Instant entryTimestamp, OrderType type, BigDecimal price,
+ BigDecimal amount) {
+ if (orders.containsKey(orderId)) {
+ return;
+ }
+ if (type == OrderType.BUY) {
+ buy(orderId, aggregateId, entryTimestamp, price, amount);
+ } else {
+ sell(orderId, aggregateId, entryTimestamp, price, amount);
+ }
+ }
+
+ private void buy(long incomingId, String aggregateId, Instant entryTimestamp, BigDecimal incomingPrice,
+ BigDecimal incomingAmount) {
+ while (!asks.isEmpty()) {
+ Order resting = asks.first();
+
+ BigDecimal restingPrice = resting.getPrice();
+ if (restingPrice.compareTo(incomingPrice) > 0) {
+ break;
+ }
+
+ long restingId = resting.getId();
+
+ BigDecimal restingAmount = resting.getRemainingAmount();
+
+ if (restingAmount.compareTo(incomingAmount) > 0) {
+ resting.reduce(incomingAmount);
+
+ engineEventSink.tryEmitNext(new OrderMatchedEvent(restingId,
+ aggregateId,
+ entryTimestamp,
+ incomingId,
+ OrderType.BUY,
+ incomingPrice,
+ restingPrice,
+ incomingAmount,
+ restingAmount,
+ resting.getRemainingAmount()));
+
+ return;
+ }
+
+ asks.remove(resting);
+ orders.remove(restingId);
+
+ engineEventSink.tryEmitNext(new OrderMatchedEvent(restingId,
+ aggregateId,
+ entryTimestamp,
+ incomingId,
+ OrderType.BUY,
+ incomingPrice,
+ restingPrice,
+ incomingAmount,
+ restingAmount,
+ BigDecimal.ZERO));
+
+
+ incomingAmount = incomingAmount.subtract(restingAmount);
+
+ if (incomingAmount.compareTo(BigDecimal.ZERO) == 0) {
+ return;
+ }
+ }
+
+ add(incomingId, aggregateId, entryTimestamp, OrderType.BUY, incomingPrice, incomingAmount, bids);
+ }
+
+ private void sell(long incomingId, String aggregateId, Instant entryTimestamp, BigDecimal incomingPrice,
+ BigDecimal incomingAmount) {
+ while (!bids.isEmpty()) {
+ Order resting = bids.first();
+
+ BigDecimal restingPrice = resting.getPrice();
+ if (restingPrice.compareTo(incomingPrice) < 0) {
+ break;
+ }
+
+ long restingId = resting.getId();
+
+ BigDecimal restingAmount = resting.getRemainingAmount();
+ if (restingAmount.compareTo(incomingAmount) > 0) {
+ resting.reduce(incomingAmount);
+
+ engineEventSink.tryEmitNext(new OrderMatchedEvent(restingId,
+ aggregateId,
+ entryTimestamp,
+ incomingId,
+ OrderType.SELL,
+ incomingPrice,
+ restingPrice,
+ incomingAmount,
+ restingAmount,
+ resting.getRemainingAmount()));
+
+ return;
+ }
+
+ bids.remove(resting);
+ orders.remove(restingId);
+
+ engineEventSink.tryEmitNext(new OrderMatchedEvent(restingId,
+ aggregateId,
+ entryTimestamp,
+ incomingId,
+ OrderType.SELL,
+ incomingPrice,
+ restingPrice,
+ incomingAmount,
+ restingAmount,
+ BigDecimal.ZERO));
+
+ incomingAmount = incomingAmount.subtract(restingAmount);
+ if (incomingAmount.compareTo(BigDecimal.ZERO) == 0) {
+ return;
+ }
+ }
+
+ add(incomingId, aggregateId, entryTimestamp, OrderType.SELL, incomingPrice, incomingAmount, asks);
+ }
+
+ private void add(long orderId, String aggregateId, Instant entryTimestamp, OrderType type, BigDecimal price,
+ BigDecimal amount, TreeSet queue) {
+ Order order = new Order(orderId, type, price, amount, term.incrementAndGet());
+
+ queue.add(order);
+ orders.put(orderId, order);
+
+ engineEventSink.tryEmitNext(new OrderPlacedEvent(orderId,
+ aggregateId,
+ entryTimestamp,
+ type,
+ price,
+ amount));
+ }
+
+ /**
+ * Cancels full amount of order.
+ *
+ * @param orderId - order identifier
+ * @param aggregateId - asset name / aggregate identifier
+ */
+ public void cancelAll(long orderId, String aggregateId) {
+ cancel(orderId, aggregateId, BigDecimal.ZERO);
+ }
+
+ /**
+ * Partially cancels order and sets new amount to be filled
+ *
+ * @param orderId - order identifier
+ * @param aggregateId - asset name / aggregate identifier
+ * @param newAmount - new amount to replace previous amount
+ */
+ public void cancel(long orderId, String aggregateId, BigDecimal newAmount) {
+ Order order = orders.get(orderId);
+ if (order == null) {
+ return;
+ }
+
+ BigDecimal remainingAmount = order.getRemainingAmount();
+
+ if (newAmount.compareTo(remainingAmount) >= 0) {
+ return;
+ }
+
+ if (newAmount.compareTo(BigDecimal.ZERO) > 0) {
+ order.resize(newAmount);
+ } else {
+ TreeSet queue = order.type() == OrderType.BUY ? bids : asks;
+
+ queue.remove(order);
+ orders.remove(orderId);
+ }
+
+ engineEventSink.tryEmitNext(new OrderCanceledEvent(orderId,
+ aggregateId,
+ order.type(),
+ remainingAmount.subtract(newAmount),
+ newAmount));
+ }
+
+ ;
+}
diff --git a/src/main/java/io/bux/matchingengine/domain/engine/Order.java b/src/main/java/io/bux/matchingengine/domain/engine/Order.java
new file mode 100644
index 0000000..815e38d
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/domain/engine/Order.java
@@ -0,0 +1,56 @@
+package io.bux.matchingengine.domain.engine;
+
+import io.bux.matchingengine.domain.query.OrderType;
+
+import java.math.BigDecimal;
+
+/**
+ * Representation of order used by {@link MatchingEngine}
+ *
+ * @author Stefan Dragisic
+ */
+public class Order {
+ private final OrderType type;
+ private final BigDecimal price;
+ private final long term;
+ private final long id;
+
+ private BigDecimal remainingAmount;
+
+ public Order(long id, OrderType type, BigDecimal price, BigDecimal amount, long term) {
+ this.id = id;
+ this.type = type;
+ this.price = price;
+ this.term = term;
+
+ this.remainingAmount = amount;
+ }
+
+ public BigDecimal getPrice() {
+ return price;
+ }
+
+ public OrderType type() {
+ return type;
+ }
+
+ public BigDecimal getRemainingAmount() {
+ return remainingAmount;
+ }
+
+ public long getTerm() {
+ return term;
+ }
+
+ public void reduce(BigDecimal amount) {
+ remainingAmount = remainingAmount.subtract(amount);
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void resize(BigDecimal newAmount) {
+ remainingAmount = newAmount;
+ }
+}
diff --git a/src/main/java/io/bux/matchingengine/domain/engine/events/OrderCanceledEvent.java b/src/main/java/io/bux/matchingengine/domain/engine/events/OrderCanceledEvent.java
new file mode 100644
index 0000000..14da837
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/domain/engine/events/OrderCanceledEvent.java
@@ -0,0 +1,20 @@
+package io.bux.matchingengine.domain.engine.events;
+
+import io.bux.matchingengine.cqrs.UpdateEvent;
+import io.bux.matchingengine.domain.query.OrderType;
+
+import java.math.BigDecimal;
+
+/**
+ * Update event that signals that order has been canceled.
+ *
+ * @author Stefan Dragisic
+ */
+public record OrderCanceledEvent(
+ long orderId,
+ String aggregateId,
+ OrderType orderType,
+ BigDecimal canceledAmount,
+ BigDecimal remainingAmount) implements UpdateEvent {
+
+}
\ No newline at end of file
diff --git a/src/main/java/io/bux/matchingengine/domain/engine/events/OrderMatchedEvent.java b/src/main/java/io/bux/matchingengine/domain/engine/events/OrderMatchedEvent.java
new file mode 100644
index 0000000..073a212
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/domain/engine/events/OrderMatchedEvent.java
@@ -0,0 +1,26 @@
+package io.bux.matchingengine.domain.engine.events;
+
+import io.bux.matchingengine.cqrs.UpdateEvent;
+import io.bux.matchingengine.domain.query.OrderType;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+
+/**
+ * Update event that signals that order has been matched.
+ *
+ * @author Stefan Dragisic
+ */
+public record OrderMatchedEvent(
+ long restingId,
+ String aggregateId,
+ Instant entryTimestamp,
+ long incomingId,
+ OrderType orderType,
+ BigDecimal incomingPrice,
+ BigDecimal restingPrice,
+ BigDecimal incomingAmount,
+ BigDecimal previousRestingAmount,
+ BigDecimal restingRemainingAmount) implements UpdateEvent {
+
+}
diff --git a/src/main/java/io/bux/matchingengine/domain/engine/events/OrderPlacedEvent.java b/src/main/java/io/bux/matchingengine/domain/engine/events/OrderPlacedEvent.java
new file mode 100644
index 0000000..6bfd51b
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/domain/engine/events/OrderPlacedEvent.java
@@ -0,0 +1,22 @@
+package io.bux.matchingengine.domain.engine.events;
+
+import io.bux.matchingengine.cqrs.UpdateEvent;
+import io.bux.matchingengine.domain.query.OrderType;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+
+/**
+ * Update event that signals that order has been placed but not yet matched.
+ *
+ * @author Stefan Dragisic
+ */
+public record OrderPlacedEvent(
+ long orderId,
+ String aggregateId,
+ Instant timestamp,
+ OrderType orderType,
+ BigDecimal price,
+ BigDecimal amount) implements UpdateEvent {
+
+}
diff --git a/src/main/java/io/bux/matchingengine/domain/events/CancellationRequestedEvent.java b/src/main/java/io/bux/matchingengine/domain/events/CancellationRequestedEvent.java
new file mode 100644
index 0000000..8ed396a
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/domain/events/CancellationRequestedEvent.java
@@ -0,0 +1,32 @@
+package io.bux.matchingengine.domain.events;
+
+import io.bux.matchingengine.cqrs.SourcingEvent;
+import org.springframework.lang.NonNull;
+
+import java.math.BigDecimal;
+import java.util.UUID;
+
+/**
+ * Event that user requested cancellation of order
+ * Not used - POC
+ *
+ * @author Stefan Dragisic
+ */
+public record CancellationRequestedEvent(String aggregateId, UUID eventId,
+ long orderId,
+ Boolean cancelAll, BigDecimal newAmount)
+ implements SourcingEvent {
+
+ public CancellationRequestedEvent(
+ @NonNull String aggregateId,
+ @NonNull UUID eventId,
+ @NonNull long orderId,
+ @NonNull Boolean cancelAll,
+ @NonNull BigDecimal newAmount) {
+ this.aggregateId = aggregateId;
+ this.eventId = eventId;
+ this.orderId = orderId;
+ this.cancelAll = cancelAll;
+ this.newAmount = newAmount;
+ }
+}
diff --git a/src/main/java/io/bux/matchingengine/domain/events/OrderAcceptedEvent.java b/src/main/java/io/bux/matchingengine/domain/events/OrderAcceptedEvent.java
new file mode 100644
index 0000000..67a23af
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/domain/events/OrderAcceptedEvent.java
@@ -0,0 +1,36 @@
+package io.bux.matchingengine.domain.events;
+
+import io.bux.matchingengine.cqrs.SourcingEvent;
+import io.bux.matchingengine.domain.query.OrderType;
+import org.springframework.lang.NonNull;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.UUID;
+
+/**
+ * Event that marks that order has passed validation phase, and order id is generated to be used for tracker.
+ *
+ * @author Stefan Dragisic
+ */
+public record OrderAcceptedEvent(String aggregateId, UUID eventId, long orderId,
+ OrderType type, BigDecimal amount, BigDecimal price, Instant entryTimestamp)
+ implements SourcingEvent {
+
+ public OrderAcceptedEvent(
+ @NonNull String aggregateId,
+ @NonNull UUID eventId,
+ @NonNull long orderId,
+ @NonNull OrderType type,
+ @NonNull BigDecimal amount,
+ @NonNull BigDecimal price,
+ @NonNull Instant entryTimestamp) {
+ this.aggregateId = aggregateId;
+ this.orderId = orderId;
+ this.eventId = eventId;
+ this.type = type;
+ this.amount = amount;
+ this.price = price;
+ this.entryTimestamp = entryTimestamp;
+ }
+}
diff --git a/src/main/java/io/bux/matchingengine/domain/events/OrderRejectedEvent.java b/src/main/java/io/bux/matchingengine/domain/events/OrderRejectedEvent.java
new file mode 100644
index 0000000..13a6370
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/domain/events/OrderRejectedEvent.java
@@ -0,0 +1,34 @@
+package io.bux.matchingengine.domain.events;
+
+import io.bux.matchingengine.cqrs.SourcingEvent;
+import io.bux.matchingengine.domain.query.OrderType;
+import org.springframework.lang.NonNull;
+
+import java.math.BigDecimal;
+import java.util.UUID;
+
+/**
+ * Event that marks that order didn't pass validation.
+ * Not used - POC
+ *
+ * @author Stefan Dragisic
+ */
+public record OrderRejectedEvent(String aggregateId, UUID eventId,
+ OrderType type, BigDecimal amount, BigDecimal price, String cause)
+ implements SourcingEvent {
+
+ public OrderRejectedEvent(
+ @NonNull String aggregateId,
+ @NonNull UUID eventId,
+ @NonNull OrderType type,
+ @NonNull BigDecimal amount,
+ @NonNull BigDecimal price,
+ @NonNull String cause) {
+ this.aggregateId = aggregateId;
+ this.eventId = eventId;
+ this.type = type;
+ this.amount = amount;
+ this.price = price;
+ this.cause = cause;
+ }
+}
diff --git a/src/main/java/io/bux/matchingengine/domain/query/BookQueryRepository.java b/src/main/java/io/bux/matchingengine/domain/query/BookQueryRepository.java
new file mode 100644
index 0000000..c7720a6
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/domain/query/BookQueryRepository.java
@@ -0,0 +1,100 @@
+package io.bux.matchingengine.domain.query;
+
+import io.bux.matchingengine.cqrs.Event;
+import io.bux.matchingengine.cqrs.QueryRepository;
+import io.bux.matchingengine.domain.engine.events.OrderCanceledEvent;
+import io.bux.matchingengine.domain.engine.events.OrderMatchedEvent;
+import io.bux.matchingengine.domain.engine.events.OrderPlacedEvent;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * Thread-safe implementation of {@link QueryRepository} used to store order & book projections.
+ *
+ * @author Stefan Dragisic
+ */
+@Component
+public class BookQueryRepository implements QueryRepository {
+
+ private final ConcurrentHashMap projection = new ConcurrentHashMap<>();
+
+ /**
+ * Returns current order projection
+ *
+ * @param orderId - order identifier
+ * @return - materialized projection
+ */
+ @Override
+ public Mono getOrder(long orderId) {
+ return Mono.fromCallable(() -> projection.get(orderId));
+ }
+
+ /**
+ * Updates projection with event. Repository uses this event to create/maintain projections.
+ *
+ * @param event - materialized event
+ */
+ @Override
+ public Mono updateProjection(Event event) {
+ return (switch (event) {
+ case OrderPlacedEvent evt -> handleOrderPlacedEvent(evt);
+ case OrderMatchedEvent evt -> handleOrderMatchedEvent(evt);
+ case OrderCanceledEvent evt -> handleOrderCanceledEvent(evt);
+ default -> Mono.empty();
+ }).subscribeOn(Schedulers.parallel())
+ .then();
+ }
+
+ private Mono handleOrderPlacedEvent(OrderPlacedEvent evt) {
+ return Mono.fromCallable(() -> projection.computeIfAbsent(evt.orderId(), orderId ->
+ new OrderEntry(orderId,
+ evt.timestamp(),
+ evt.aggregateId(),
+ evt.price(),
+ evt.amount(),
+ evt.orderType(),
+ new CopyOnWriteArrayList<>(),
+ evt.amount())));
+ }
+
+ private Mono handleOrderMatchedEvent(OrderMatchedEvent evt) {
+ return Mono.fromCallable(() -> {
+ //update previous
+ projection.computeIfPresent(evt.restingId(), (key, order) -> {
+ order.setPendingAmount(evt.restingRemainingAmount());
+ order.trades().add(
+ new OrderTradeEntry(evt.incomingId(),
+ evt.incomingAmount(),
+ evt.restingPrice()));
+ return order;
+ });
+ //enter new
+ return projection.computeIfAbsent(evt.incomingId(), incomingId ->
+ new OrderEntry(incomingId,
+ evt.entryTimestamp(),
+ evt.aggregateId(),
+ evt.incomingPrice(),
+ evt.incomingAmount(),
+ evt.orderType(),
+ new CopyOnWriteArrayList<>(List.of(new OrderTradeEntry(
+ evt.restingId(),
+ evt.previousRestingAmount().subtract(evt.restingRemainingAmount()),
+ evt.restingPrice()
+ ))),
+ evt.incomingAmount()
+ .subtract(evt.previousRestingAmount().subtract(evt.restingRemainingAmount()))));
+ });
+ }
+
+ private Mono handleOrderCanceledEvent(OrderCanceledEvent evt) {
+ return Mono.fromCallable(() -> projection.computeIfPresent(evt.orderId(), (key, order) -> {
+ order.setPendingAmount(evt.remainingAmount());
+ return order;
+ }));
+ }
+}
diff --git a/src/main/java/io/bux/matchingengine/domain/query/OrderEntry.java b/src/main/java/io/bux/matchingengine/domain/query/OrderEntry.java
new file mode 100644
index 0000000..adc4870
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/domain/query/OrderEntry.java
@@ -0,0 +1,78 @@
+package io.bux.matchingengine.domain.query;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.List;
+
+/**
+ * @author Stefan Dragisic
+ */
+public class OrderEntry {
+
+ private final long orderId;
+ private final Instant entryTimestamp;
+ private final String asset;
+ private final BigDecimal price;
+ private final BigDecimal amount;
+ private final OrderType direction;
+ private final List trades;
+ private BigDecimal pendingAmount;
+
+
+ public OrderEntry(
+ long orderId,
+ Instant entryTimestamp,
+ String asset,
+ BigDecimal price,
+ BigDecimal amount,
+ OrderType direction,
+ List trades,
+ BigDecimal pendingAmount) {
+ this.orderId = orderId;
+ this.entryTimestamp = entryTimestamp;
+ this.asset = asset;
+ this.price = price;
+ this.amount = amount;
+ this.direction = direction;
+ this.trades = trades;
+ this.pendingAmount = pendingAmount;
+ }
+
+ public void setPendingAmount(BigDecimal pendingAmount) {
+ this.pendingAmount = pendingAmount;
+ }
+
+ public long orderId() {
+ return orderId;
+ }
+
+ public Instant entryTimestamp() {
+ return entryTimestamp;
+ }
+
+ public String asset() {
+ return asset;
+ }
+
+ public BigDecimal price() {
+ return price;
+ }
+
+ public BigDecimal amount() {
+ return amount;
+ }
+
+ public OrderType direction() {
+ return direction;
+ }
+
+ public List trades() {
+ return trades;
+ }
+
+ public BigDecimal pendingAmount() {
+ return pendingAmount;
+ }
+
+}
+
diff --git a/src/main/java/io/bux/matchingengine/domain/query/OrderTradeEntry.java b/src/main/java/io/bux/matchingengine/domain/query/OrderTradeEntry.java
new file mode 100644
index 0000000..d8ff48b
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/domain/query/OrderTradeEntry.java
@@ -0,0 +1,19 @@
+package io.bux.matchingengine.domain.query;
+
+import java.math.BigDecimal;
+
+public record OrderTradeEntry(
+ long orderId,
+ BigDecimal amount,
+ BigDecimal price
+) {
+
+ public OrderTradeEntry(
+ long orderId,
+ BigDecimal amount,
+ BigDecimal price) {
+ this.orderId = orderId;
+ this.price = price;
+ this.amount = amount;
+ }
+}
diff --git a/src/main/java/io/bux/matchingengine/domain/query/OrderType.java b/src/main/java/io/bux/matchingengine/domain/query/OrderType.java
new file mode 100644
index 0000000..86b1c20
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/domain/query/OrderType.java
@@ -0,0 +1,9 @@
+package io.bux.matchingengine.domain.query;
+
+/**
+ * @author Stefan Dragisic
+ */
+public enum OrderType {
+ BUY,
+ SELL
+}
diff --git a/src/main/java/io/bux/matchingengine/web/TradingController.java b/src/main/java/io/bux/matchingengine/web/TradingController.java
new file mode 100644
index 0000000..bf0f48f
--- /dev/null
+++ b/src/main/java/io/bux/matchingengine/web/TradingController.java
@@ -0,0 +1,161 @@
+package io.bux.matchingengine.web;
+
+import io.bux.matchingengine.api.protobuf.OrderStatusResponse;
+import io.bux.matchingengine.api.protobuf.PlaceOrderRequest;
+import io.bux.matchingengine.api.protobuf.Trade;
+import io.bux.matchingengine.cqrs.Event;
+import io.bux.matchingengine.cqrs.SourcingEvent;
+import io.bux.matchingengine.domain.Book;
+import io.bux.matchingengine.domain.BookAggregateRepository;
+import io.bux.matchingengine.domain.bus.CommandBus;
+import io.bux.matchingengine.domain.command.CancelOrderCommand;
+import io.bux.matchingengine.domain.command.MakeOrderCommand;
+import io.bux.matchingengine.domain.events.OrderAcceptedEvent;
+import io.bux.matchingengine.domain.query.BookQueryRepository;
+import io.bux.matchingengine.domain.query.OrderEntry;
+import io.bux.matchingengine.domain.query.OrderType;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Implements REST Endpoints to place, get or cancel order.
+ *
+ * @author Stefan Dragisic
+ * @author bux
+ */
+@RestController
+public class TradingController {
+
+ private final CommandBus commandBus;
+ private final BookAggregateRepository bookAggregateRepository;
+ private final BookQueryRepository bookQueryRepository;
+
+ public TradingController(CommandBus commandBus,
+ BookAggregateRepository bookAggregateRepository,
+ BookQueryRepository bookQueryRepository) {
+ this.commandBus = commandBus;
+ this.bookAggregateRepository = bookAggregateRepository;
+ this.bookQueryRepository = bookQueryRepository;
+ }
+
+ /**
+ * Places order into trading system
+ *
+ * @param request user request to place order
+ * @return order status
+ */
+ @PostMapping("/orders")
+ public Mono placeOrder(@RequestBody PlaceOrderRequest request) {
+ return commandBus.sendCommand(toMakeOrderCommand(request))
+ .cast(OrderAcceptedEvent.class)
+ .flatMap(this::getOrderProjection)
+ .map(this::toOrderStatus);
+ }
+
+ /**
+ * Not used - POC
+ * Intended to UI or client applications to mantain their own projection
+ *
+ * @param asset
+ * @return streams all events from aggregate
+ */
+ @GetMapping(value = "/book/{asset}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux bookEvents(@PathVariable String asset) {
+ return bookAggregateRepository.load(asset)
+ .flatMapMany(Book::aggregateEvents);
+ }
+
+ /**
+ * Retrieves order from projection
+ *
+ * @param orderId - order identifier
+ * @return order status
+ */
+ @GetMapping("/orders/{orderId}")
+ public Mono getOrder(@PathVariable Long orderId) {
+ return bookQueryRepository.getOrder(orderId)
+ .map(this::toOrderStatus);
+ }
+
+ /**
+ * POC
+ * Cancels pending order from book.
+ * @param orderId - order identifier
+ * @return response OK or error with error message
+ */
+ @PostMapping("/orders/{orderId}/cancel")
+ public Mono> cancelOrder(@PathVariable Long orderId) {
+ return bookQueryRepository.getOrder(orderId)
+ .flatMap(TradingController::validateOrderAmount)
+ .flatMap(this::sendCancelCommand)
+ .switchIfEmpty(Mono.error(new IllegalStateException(
+ "You can't cancel non-existing order.")))
+ .map(order -> ResponseEntity.accepted().body("OK"))
+ .onErrorResume(e -> Mono.just(ResponseEntity.badRequest().body(e.getMessage())));
+ }
+
+ private Mono sendCancelCommand(OrderEntry order) {
+ return commandBus.sendCommand(new CancelOrderCommand(order.asset(),
+ UUID.randomUUID(),
+ order.orderId(),
+ true,
+ BigDecimal.ZERO));
+ }
+
+ private Mono extends OrderEntry> getOrderProjection(OrderAcceptedEvent ev) {
+ return bookQueryRepository.getOrder(ev.orderId())
+ .repeatWhenEmpty(5, o -> o.delayElements(
+ Duration.ofMillis(50)));
+ }
+
+ private MakeOrderCommand toMakeOrderCommand(PlaceOrderRequest request) {
+ return new MakeOrderCommand(request.getAsset(),
+ UUID.randomUUID(),
+ OrderType.valueOf(request.getDirection().name()),
+ BigDecimal.valueOf(request.getAmount()),
+ BigDecimal.valueOf(request.getPrice()));
+ }
+
+ private OrderStatusResponse toOrderStatus(OrderEntry order) {
+ return OrderStatusResponse.newBuilder()
+ .setId(order.orderId())
+ .setTimestamp(order.entryTimestamp().toString())
+ .setAsset(order.asset())
+ .setAmount(order.amount().doubleValue())
+ .setPrice(order.price().doubleValue())
+ .setDirection(io.bux.matchingengine.api.protobuf.OrderType.valueOf(
+ order.direction().name()))
+ .addAllTrades(order.trades().stream()
+ .map(t -> Trade.newBuilder()
+ .setOrderId(t.orderId())
+ .setPrice(t.price()
+ .doubleValue())
+ .setAmount(t.amount()
+ .doubleValue())
+ .build())
+ .collect(Collectors.toList()))
+ .setPendingAmount(order.pendingAmount().doubleValue())
+ .build();
+ }
+
+ private static Mono extends OrderEntry> validateOrderAmount(OrderEntry order) {
+ if (order.price().compareTo(BigDecimal.ZERO) > 0) {
+ return Mono.just(order);
+ } else {
+ return Mono.error(new IllegalStateException("Order already executed."));
+ }
+ }
+
+}
diff --git a/src/main/resources/api.proto b/src/main/resources/api.proto
new file mode 100644
index 0000000..0c69d1b
--- /dev/null
+++ b/src/main/resources/api.proto
@@ -0,0 +1,45 @@
+syntax = "proto3";
+
+package io.bux.matchingengine.api;
+
+option java_package = "io.bux.matchingengine.api.protobuf";
+option java_multiple_files = true;
+
+enum OrderType {
+ BUY = 0;
+ SELL = 1;
+}
+
+/**
+DTO to carry order request
+ */
+message PlaceOrderRequest {
+ string asset = 1;
+ double price = 2;
+ double amount = 3;
+ OrderType direction = 4;
+}
+
+/**
+DTO to carry order status response
+ */
+message OrderStatusResponse {
+ int64 id = 1;
+ string timestamp = 2;
+ string asset = 3;
+ double price = 4;
+ double amount = 5;
+ OrderType direction = 6;
+ repeated Trade trades = 7;
+ double pendingAmount = 8;
+}
+
+/**
+DTO to carry trade response
+ */
+message Trade {
+ int64 orderId = 1;
+ double amount = 2;
+ double price = 3;
+}
+
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/src/main/resources/application.properties
@@ -0,0 +1 @@
+
diff --git a/src/test/java/io/bux/matchingengine/cqrs/CommandBusTest.java b/src/test/java/io/bux/matchingengine/cqrs/CommandBusTest.java
new file mode 100644
index 0000000..3ee59ff
--- /dev/null
+++ b/src/test/java/io/bux/matchingengine/cqrs/CommandBusTest.java
@@ -0,0 +1,162 @@
+package io.bux.matchingengine.cqrs;
+
+import io.bux.matchingengine.domain.Book;
+import io.bux.matchingengine.domain.BookAggregateRepository;
+import io.bux.matchingengine.domain.query.OrderType;
+import io.bux.matchingengine.domain.command.CancelOrderCommand;
+import io.bux.matchingengine.domain.command.MakeOrderCommand;
+import io.bux.matchingengine.domain.bus.CommandBus;
+import io.bux.matchingengine.domain.engine.MatchingEngine;
+import io.bux.matchingengine.domain.events.CancellationRequestedEvent;
+import io.bux.matchingengine.domain.events.OrderAcceptedEvent;
+import org.junit.jupiter.api.*;
+import org.mockito.*;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * @author Stefan Dragisic
+ */
+class CommandBusTest {
+
+ private CommandBus commandBus;
+ private MatchingEngine matchingEngineMock;
+
+ @BeforeEach
+ public void setUp() {
+ BookAggregateRepository aggregateRepositoryMock = mock(BookAggregateRepository.class);
+ matchingEngineMock = mock(MatchingEngine.class);
+ Book book = new Book("instrumentId", matchingEngineMock);
+ when(aggregateRepositoryMock.load("instrumentId")).thenReturn(Mono.just(book));
+ commandBus = new CommandBus(aggregateRepositoryMock);
+ }
+
+
+ @Test
+ public void testMakeOrderCommand() {
+ StepVerifier.create(commandBus.sendCommand(new MakeOrderCommand("instrumentId",
+ UUID.randomUUID(),
+ OrderType.BUY,
+ BigDecimal.ONE,
+ BigDecimal.valueOf(1))))
+ .expectNextMatches(result -> result instanceof OrderAcceptedEvent &&
+ result.aggregateId().equals("instrumentId")
+ && ((OrderAcceptedEvent) result).type() == OrderType.BUY
+ && ((OrderAcceptedEvent) result).amount().compareTo(BigDecimal.ONE) == 0
+ && ((OrderAcceptedEvent) result).price().compareTo(BigDecimal.ONE) == 0)
+ .verifyComplete();
+ verify(matchingEngineMock).placeOrder(anyLong(),
+ eq("instrumentId"),
+ any(Instant.class),
+ eq(OrderType.BUY),
+ eq(BigDecimal.ONE),
+ eq(BigDecimal.ONE));
+ }
+
+ @Test
+ public void testMakeOrderCommandMany() {
+ Mono sendCommands = commandBus.sendCommand(new MakeOrderCommand("instrumentId",
+ UUID.randomUUID(),
+ OrderType.BUY,
+ BigDecimal.ONE,
+ BigDecimal.ONE))
+ .then(commandBus.sendCommand(new MakeOrderCommand("instrumentId",
+ UUID.randomUUID(),
+ OrderType.SELL,
+ BigDecimal.ONE,
+ BigDecimal.valueOf(2))))
+ .then(commandBus.sendCommand(new MakeOrderCommand("instrumentId",
+ UUID.randomUUID(),
+ OrderType.BUY,
+ BigDecimal.valueOf(2),
+ BigDecimal.valueOf(2))))
+ .then();
+
+ StepVerifier.create(sendCommands)
+ .verifyComplete();
+
+ ArgumentCaptor orderIdCaptor = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor orderTypeCaptor = ArgumentCaptor.forClass(OrderType.class);
+ ArgumentCaptor amountCaptor = ArgumentCaptor.forClass(BigDecimal.class);
+ ArgumentCaptor priceCaptor = ArgumentCaptor.forClass(BigDecimal.class);
+ verify(matchingEngineMock, times(3)).placeOrder(orderIdCaptor.capture(),
+ anyString(),
+ any(Instant.class),
+ orderTypeCaptor.capture(),
+ priceCaptor.capture(),
+ amountCaptor.capture());
+
+ assertEquals(orderIdCaptor.getAllValues(), List.of(1L, 2L, 3L));
+ assertEquals(orderTypeCaptor.getAllValues(), List.of(OrderType.BUY, OrderType.SELL, OrderType.BUY));
+ assertEquals(amountCaptor.getAllValues(), List.of(BigDecimal.ONE, BigDecimal.ONE, BigDecimal.valueOf(2)));
+ assertEquals(priceCaptor.getAllValues(), List.of(BigDecimal.ONE, BigDecimal.valueOf(2), BigDecimal.valueOf(2)));
+ }
+
+ @Test
+ public void testMakeInvalidOrderCommand() {
+ StepVerifier.create(commandBus.sendCommand(new MakeOrderCommand("instrumentId",
+ UUID.randomUUID(),
+ OrderType.BUY,
+ BigDecimal.valueOf(-1),
+ BigDecimal.valueOf(-1))))
+ .expectErrorMatches(err -> err instanceof IllegalStateException
+ && err.getMessage().startsWith("Amount/Price needs to be larger then zero!"))
+ .verify();
+ verify(matchingEngineMock, times(0)).placeOrder(anyLong(),
+ anyString(),
+ any(Instant.class),
+ any(), any(BigDecimal.class), any(BigDecimal.class));
+ }
+
+ @Test
+ public void testCancelAllOrderCommand() {
+ StepVerifier.create(commandBus.sendCommand(new CancelOrderCommand("instrumentId",
+ UUID.randomUUID(),
+ 1,
+ true,
+ BigDecimal.ZERO)))
+ .expectNextMatches(result -> result instanceof CancellationRequestedEvent &&
+ result.aggregateId().equals("instrumentId")
+ && ((CancellationRequestedEvent) result).orderId() == 1
+ && ((CancellationRequestedEvent) result).cancelAll())
+ .verifyComplete();
+ verify(matchingEngineMock).cancelAll(1, "instrumentId");
+ }
+
+ @Test
+ public void testCancelPartialOrderCommand() {
+ StepVerifier.create(commandBus.sendCommand(new CancelOrderCommand("instrumentId",
+ UUID.randomUUID(),
+ 1,
+ false,
+ BigDecimal.TEN)))
+ .expectNextMatches(result -> result instanceof CancellationRequestedEvent &&
+ result.aggregateId().equals("instrumentId")
+ && ((CancellationRequestedEvent) result).orderId() == 1
+ && !((CancellationRequestedEvent) result).cancelAll()
+ && ((CancellationRequestedEvent) result).newAmount().compareTo(BigDecimal.TEN) == 0)
+ .verifyComplete();
+ verify(matchingEngineMock).cancel(1,"instrumentId", BigDecimal.TEN);
+ }
+
+ @Test
+ public void testCancelPartialInvalidOrderCommand() {
+ StepVerifier.create(commandBus.sendCommand(new CancelOrderCommand("instrumentId",
+ UUID.randomUUID(),
+ 1,
+ false,
+ BigDecimal.valueOf(-10))))
+ .expectErrorMatches(err -> err instanceof IllegalStateException
+ && err.getMessage().startsWith("Cancellation: new amount can't be <= 0!"))
+ .verify();
+ verify(matchingEngineMock, times(0)).cancel(anyLong(), anyString(), any(BigDecimal.class));
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/io/bux/matchingengine/domain/BookAggregateRepositoryTest.java b/src/test/java/io/bux/matchingengine/domain/BookAggregateRepositoryTest.java
new file mode 100644
index 0000000..50eb9d7
--- /dev/null
+++ b/src/test/java/io/bux/matchingengine/domain/BookAggregateRepositoryTest.java
@@ -0,0 +1,23 @@
+package io.bux.matchingengine.domain;
+
+import io.bux.matchingengine.domain.query.BookQueryRepository;
+import org.junit.jupiter.api.*;
+import reactor.test.StepVerifier;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * @author Stefan Dragisic
+ */
+class BookAggregateRepositoryTest {
+
+ private final BookAggregateRepository testSubject = new BookAggregateRepository(mock(BookQueryRepository.class));
+
+ @Test
+ public void loadOrCreate() {
+ StepVerifier.create(testSubject.load("instrumentId"))
+ .expectNextCount(1)
+ .verifyComplete();
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/java/io/bux/matchingengine/domain/BookQueryRepositoryTest.java b/src/test/java/io/bux/matchingengine/domain/BookQueryRepositoryTest.java
new file mode 100644
index 0000000..75a4f76
--- /dev/null
+++ b/src/test/java/io/bux/matchingengine/domain/BookQueryRepositoryTest.java
@@ -0,0 +1,159 @@
+package io.bux.matchingengine.domain;
+
+import io.bux.matchingengine.domain.query.OrderType;
+import io.bux.matchingengine.domain.query.BookQueryRepository;
+import io.bux.matchingengine.domain.engine.events.OrderMatchedEvent;
+import io.bux.matchingengine.domain.engine.events.OrderPlacedEvent;
+import org.junit.jupiter.api.*;
+import reactor.test.StepVerifier;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+
+/**
+ * @author Stefan Dragisic
+ */
+class BookQueryRepositoryTest {
+
+ private BookQueryRepository testSubject;
+
+ @BeforeEach
+ void setUp() {
+ testSubject = new BookQueryRepository();
+ }
+
+ @Test
+ void testOrderPlacedProjection() {
+ StepVerifier.create(testSubject.updateProjection(new OrderPlacedEvent(1L,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(10),
+ BigDecimal.valueOf(100)))
+ .then(testSubject.getOrder(1L)))
+ .expectNextMatches(orderEntry -> orderEntry.orderId() == 1L
+ && orderEntry.entryTimestamp().equals(Instant.MIN)
+ && orderEntry.direction() == OrderType.SELL
+ && orderEntry.price().compareTo(BigDecimal.valueOf(10)) == 0
+ && orderEntry.amount().compareTo(BigDecimal.valueOf(100)) == 0
+ )
+ .verifyComplete();
+ }
+
+
+ @Test
+ void buxTest() {
+ StepVerifier.create(testSubject.updateProjection(new OrderPlacedEvent(0L,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(
+ 43251.00),
+ BigDecimal.valueOf(
+ 1.0)))
+ .then(testSubject.getOrder(0L)))
+ .expectNextMatches(orderEntry -> orderEntry.orderId() == 0L
+ && orderEntry.entryTimestamp().equals(Instant.MIN)
+ && orderEntry.direction() == OrderType.SELL
+ && orderEntry.price().compareTo(BigDecimal.valueOf(43251.00))
+ == 0
+ && orderEntry.amount().compareTo(BigDecimal.valueOf(1.0)) == 0
+ && orderEntry.pendingAmount().compareTo(BigDecimal.valueOf(1.0))
+ == 0
+ && orderEntry.trades().isEmpty()
+ )
+ .expectComplete()
+ .verify();
+
+ StepVerifier.create(testSubject.updateProjection(new OrderPlacedEvent(1L,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(
+ 43250.00),
+ BigDecimal.valueOf(
+ 0.25)
+ ))
+ .then(testSubject.getOrder(1L)))
+ .expectNextMatches(orderEntry -> orderEntry.orderId() == 1L
+ && orderEntry.entryTimestamp().equals(Instant.MIN)
+ && orderEntry.direction() == OrderType.BUY
+ && orderEntry.price().compareTo(BigDecimal.valueOf(43250.00)) == 0
+ && orderEntry.amount().compareTo(BigDecimal.valueOf(0.25)) == 0
+ && orderEntry.pendingAmount().compareTo(BigDecimal.valueOf(0.25))
+ == 0
+ && orderEntry.trades().isEmpty()
+ )
+ .expectComplete()
+ .verify();
+
+ StepVerifier.create(testSubject.updateProjection(new OrderMatchedEvent(0L,
+ "BTC",
+ Instant.MAX,
+ 2L,
+ OrderType.BUY,
+ BigDecimal.valueOf(
+ 43250.00),
+ BigDecimal.valueOf(
+ 43251.00),
+ BigDecimal.valueOf(
+ 0.35),
+ BigDecimal.valueOf(
+ 1.0),
+ BigDecimal.valueOf(
+ 0.65)
+ ))
+ .then(testSubject.getOrder(0L)))
+ .expectNextMatches(orderEntry -> orderEntry.orderId() == 0L
+ && orderEntry.entryTimestamp().equals(Instant.MIN)
+ && orderEntry.direction() == OrderType.SELL
+ && orderEntry.price().compareTo(BigDecimal.valueOf(43251.00)) == 0
+ && orderEntry.amount().compareTo(BigDecimal.valueOf(1.0)) == 0
+ && orderEntry.pendingAmount().compareTo(BigDecimal.valueOf(0.65))
+ == 0
+ && orderEntry.trades().stream().allMatch(t -> t.orderId() == 2L
+ && t.amount().compareTo(BigDecimal.valueOf(0.35)) == 0
+ && t.price().compareTo(BigDecimal.valueOf(
+ 43251.00)) == 0)
+ )
+ .expectComplete()
+ .verify();
+
+
+ StepVerifier.create(testSubject.updateProjection(new OrderMatchedEvent(0L,
+ "BTC",
+ Instant.MAX,
+ 3L,
+ OrderType.BUY,
+ BigDecimal.valueOf(
+ 43250.00),
+ BigDecimal.valueOf(
+ 43251.00),
+ BigDecimal.valueOf(
+ 0.65),
+ BigDecimal.valueOf(
+ 0.65),
+ BigDecimal.valueOf(
+ 0.0)
+ ))
+ .then(testSubject.getOrder(0L)))
+ .expectNextMatches(orderEntry -> orderEntry.orderId() == 0L
+ && orderEntry.entryTimestamp().equals(Instant.MIN)
+ && orderEntry.direction() == OrderType.SELL
+ && orderEntry.price().compareTo(BigDecimal.valueOf(43251.00)) == 0
+ && orderEntry.amount().compareTo(BigDecimal.valueOf(1)) == 0
+ && orderEntry.pendingAmount().compareTo(BigDecimal.valueOf(0.0))
+ == 0
+ && orderEntry.trades().stream().anyMatch(t -> t.orderId() == 2L
+ && t.amount().compareTo(BigDecimal.valueOf(0.35)) == 0
+ && t.price().compareTo(BigDecimal.valueOf(
+ 43251.00)) == 0)
+ && orderEntry.trades().stream().anyMatch(t -> t.orderId() == 3L
+ && t.amount().compareTo(BigDecimal.valueOf(0.65)) == 0
+ && t.price().compareTo(BigDecimal.valueOf(
+ 43251.00)) == 0)
+ )
+ .expectComplete()
+ .verify();
+ }
+}
diff --git a/src/test/java/io/bux/matchingengine/domain/engine/MatchingEngineTest.java b/src/test/java/io/bux/matchingengine/domain/engine/MatchingEngineTest.java
new file mode 100644
index 0000000..368d0e5
--- /dev/null
+++ b/src/test/java/io/bux/matchingengine/domain/engine/MatchingEngineTest.java
@@ -0,0 +1,560 @@
+package io.bux.matchingengine.domain.engine;
+
+
+import io.bux.matchingengine.domain.query.OrderType;
+import io.bux.matchingengine.domain.engine.events.OrderCanceledEvent;
+import io.bux.matchingengine.domain.engine.events.OrderMatchedEvent;
+import io.bux.matchingengine.domain.engine.events.OrderPlacedEvent;
+import org.junit.jupiter.api.*;
+import reactor.test.StepVerifier;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+
+/**
+ * @author Stefan Dragisic
+ */
+class MatchingEngineTest {
+
+ private final MatchingEngine testSubject = new MatchingEngine();
+
+ @Test
+ public void buxTest() {
+ StepVerifier.create(testSubject.engineEvents().take(9))
+ .expectSubscription()
+ .then(() -> testSubject.placeOrder(1,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(10.05),
+ BigDecimal.valueOf(20)))
+ .then(() -> testSubject.placeOrder(2,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(10.04),
+ BigDecimal.valueOf(20)))
+ .then(() -> testSubject.placeOrder(3,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(10.05),
+ BigDecimal.valueOf(40)))
+ .then(() -> testSubject.placeOrder(5,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(10.02),
+ BigDecimal.valueOf(40)))
+ .then(() -> testSubject.placeOrder(4,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(10.00),
+ BigDecimal.valueOf(20)))
+ .then(() -> testSubject.placeOrder(6,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(10.00),
+ BigDecimal.valueOf(40)))
+ .expectNextCount(6)
+ .then(() -> testSubject.placeOrder(7,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(10.06),
+ BigDecimal.valueOf(55)))
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).restingId() == 2
+ && ((OrderMatchedEvent) orderEvent).previousRestingAmount().compareTo(BigDecimal.valueOf(20)) == 0
+ )
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).restingId() == 1)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).restingId() == 3)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void buxTest2() {
+ StepVerifier.create(testSubject.engineEvents().take(4))
+ .expectSubscription()
+ .then(() -> testSubject.placeOrder(0L,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(43251.00),
+ BigDecimal.valueOf(1.0)))
+ .then(() -> testSubject.placeOrder(1L,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(43250.00),
+ BigDecimal.valueOf(0.25)))
+ .then(() -> testSubject.placeOrder(2L,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(43253.00),
+ BigDecimal.valueOf(0.35)))
+ .then(() -> testSubject.placeOrder(4L,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(43251.00),
+ BigDecimal.valueOf(0.65)))
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent
+ && ((OrderPlacedEvent) orderEvent).orderId() == 0L
+ && ((OrderPlacedEvent) orderEvent).amount().compareTo(BigDecimal.valueOf(1.0)) == 0
+ && ((OrderPlacedEvent) orderEvent).price().compareTo(BigDecimal.valueOf(43251.00)) == 0)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent
+ && ((OrderPlacedEvent) orderEvent).orderId() == 1L
+ && ((OrderPlacedEvent) orderEvent).amount().compareTo(BigDecimal.valueOf(0.25)) == 0
+ && ((OrderPlacedEvent) orderEvent).price().compareTo(BigDecimal.valueOf(43250.00)) == 0)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).restingId() == 0L
+ && ((OrderMatchedEvent) orderEvent).incomingId() == 2L
+ && ((OrderMatchedEvent) orderEvent).orderType() == OrderType.BUY
+ && ((OrderMatchedEvent) orderEvent).restingPrice().compareTo(BigDecimal.valueOf(43251.00)) == 0
+ && ((OrderMatchedEvent) orderEvent).incomingAmount().compareTo(BigDecimal.valueOf(0.35)) == 0
+ && ((OrderMatchedEvent) orderEvent).previousRestingAmount().compareTo(BigDecimal.valueOf(1.0)) == 0
+ && ((OrderMatchedEvent) orderEvent).restingRemainingAmount().compareTo(BigDecimal.valueOf(0.65)) == 0
+ )
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).restingId() == 0L
+ && ((OrderMatchedEvent) orderEvent).incomingId() == 4L
+ && ((OrderMatchedEvent) orderEvent).orderType() == OrderType.BUY
+ && ((OrderMatchedEvent) orderEvent).restingPrice().compareTo(BigDecimal.valueOf(43251.00)) == 0
+ && ((OrderMatchedEvent) orderEvent).incomingAmount().compareTo(BigDecimal.valueOf(0.65)) == 0
+ && ((OrderMatchedEvent) orderEvent).previousRestingAmount().compareTo(BigDecimal.valueOf(0.65)) == 0
+ && ((OrderMatchedEvent) orderEvent).restingRemainingAmount().compareTo(BigDecimal.valueOf(0.0)) == 0
+ )
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void filledBuyTest() {
+ StepVerifier.create(testSubject.engineEvents().take(2))
+ .expectSubscription()
+ .then(() -> testSubject.placeOrder(1,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(100.0),
+ BigDecimal.valueOf(1.0)))
+ .then(() -> testSubject.placeOrder(2,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(100.0),
+ BigDecimal.valueOf(1.0)))
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).orderType() == OrderType.BUY)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void filledSellTest() {
+ StepVerifier.create(testSubject.engineEvents().take(2))
+ .expectSubscription()
+ .then(() -> testSubject.placeOrder(1,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(100.10),
+ BigDecimal.valueOf(10.1)))
+ .then(() -> testSubject.placeOrder(2,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(100.10),
+ BigDecimal.valueOf(10.1)))
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).orderType() == OrderType.SELL)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void multiBuy() {
+ StepVerifier.create(testSubject.engineEvents().take(5))
+ .expectSubscription()
+ .then(() -> testSubject.placeOrder(1,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(1000.0),
+ BigDecimal.valueOf(1.00)))
+ .then(() -> testSubject.placeOrder(2,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(1001.0),
+ BigDecimal.valueOf(1.00)))
+ .then(() -> testSubject.placeOrder(3,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(999.0),
+ BigDecimal.valueOf(0.50)))
+ .then(() -> testSubject.placeOrder(4,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(1000.0),
+ BigDecimal.valueOf(1.00)))
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).restingId() == 3
+ && ((OrderMatchedEvent) orderEvent).restingRemainingAmount().compareTo(BigDecimal.ZERO)
+ == 0
+ && ((OrderMatchedEvent) orderEvent).restingPrice().compareTo(BigDecimal.valueOf(999.0))
+ == 0
+ && ((OrderMatchedEvent) orderEvent).entryTimestamp().equals(Instant.MIN)
+ )
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).restingId() == 1
+ && ((OrderMatchedEvent) orderEvent).restingPrice().compareTo(BigDecimal.valueOf(1000.0))
+ == 0
+ && ((OrderMatchedEvent) orderEvent).restingRemainingAmount()
+ .compareTo(BigDecimal.valueOf(0.50)) == 0)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void multiSell() {
+ StepVerifier.create(testSubject.engineEvents().take(5))
+ .expectSubscription()
+ .then(() -> testSubject.placeOrder(1,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(1.000),
+ BigDecimal.valueOf(1.0)))
+ .then(() -> testSubject.placeOrder(2,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(0.999),
+ BigDecimal.valueOf(1.0)))
+ .then(() -> testSubject.placeOrder(3,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(1.001),
+ BigDecimal.valueOf(0.5)))
+ .then(() -> testSubject.placeOrder(4,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(1),
+ BigDecimal.valueOf(1.0)))
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).restingId() == 3
+ && ((OrderMatchedEvent) orderEvent).restingPrice().compareTo(BigDecimal.valueOf(1.001))
+ == 0
+ && ((OrderMatchedEvent) orderEvent).restingRemainingAmount().compareTo(BigDecimal.ZERO)
+ == 0)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).restingId() == 1
+ && ((OrderMatchedEvent) orderEvent).restingPrice().compareTo(BigDecimal.valueOf(1)) == 0
+ && ((OrderMatchedEvent) orderEvent).restingRemainingAmount()
+ .compareTo(BigDecimal.valueOf(0.5)) == 0)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void partialBuy() {
+ StepVerifier.create(testSubject.engineEvents().take(3))
+ .expectSubscription()
+ .then(() -> testSubject.placeOrder(1,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(50)))
+ .then(() -> testSubject.placeOrder(2,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(100)))
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).restingId() == 1
+ && ((OrderMatchedEvent) orderEvent).restingRemainingAmount().compareTo(BigDecimal.ZERO)
+ == 0)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent
+ && ((OrderPlacedEvent) orderEvent).orderId() == 2
+ && ((OrderPlacedEvent) orderEvent).price().compareTo(BigDecimal.valueOf(1000)) == 0
+ && ((OrderPlacedEvent) orderEvent).amount().compareTo(BigDecimal.valueOf(50)) == 0)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void partialSell() {
+ StepVerifier.create(testSubject.engineEvents().take(3))
+ .expectSubscription()
+ .then(() -> testSubject.placeOrder(1,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(50)))
+ .then(() -> testSubject.placeOrder(2,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(100)))
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).restingId() == 1
+ && ((OrderMatchedEvent) orderEvent).restingRemainingAmount().compareTo(BigDecimal.ZERO)
+ == 0)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent
+ && ((OrderPlacedEvent) orderEvent).orderId() == 2
+ && ((OrderPlacedEvent) orderEvent).price().compareTo(BigDecimal.valueOf(1000)) == 0
+ && ((OrderPlacedEvent) orderEvent).amount().compareTo(BigDecimal.valueOf(50)) == 0)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void partialBidFill() {
+ StepVerifier.create(testSubject.engineEvents().take(5))
+ .expectSubscription()
+ .then(() -> testSubject.placeOrder(1,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(100)))
+ .then(() -> testSubject.placeOrder(2,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(50)))
+ .then(() -> testSubject.placeOrder(3,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(50)))
+ .then(() -> testSubject.placeOrder(4,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(50)))
+ .then(() -> testSubject.placeOrder(5,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(50)))
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).restingId() == 1
+ &&
+ ((OrderMatchedEvent) orderEvent).restingRemainingAmount().compareTo(BigDecimal.valueOf(50))
+ == 0)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).restingId() == 1
+ && ((OrderMatchedEvent) orderEvent).restingRemainingAmount().compareTo(BigDecimal.ZERO)
+ == 0)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void partialAskFill() {
+ StepVerifier.create(testSubject.engineEvents().take(4))
+ .expectSubscription()
+ .then(() -> testSubject.placeOrder(1,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(100)))
+ .then(() -> testSubject.placeOrder(2,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(50)))
+ .then(() -> testSubject.placeOrder(3,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(50)))
+ .then(() -> testSubject.placeOrder(4,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(50)))
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).restingId() == 1
+ &&
+ ((OrderMatchedEvent) orderEvent).restingRemainingAmount().compareTo(BigDecimal.valueOf(50))
+ == 0)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent
+ && ((OrderMatchedEvent) orderEvent).restingId() == 1
+ && ((OrderMatchedEvent) orderEvent).restingRemainingAmount().compareTo(BigDecimal.ZERO)
+ == 0)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectComplete()
+ .verify();
+ }
+
+
+ @Test
+ public void cancel() {
+ StepVerifier.create(testSubject.engineEvents().take(3))
+ .expectSubscription()
+ .then(() -> testSubject.placeOrder(1,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(100)))
+ .then(() -> testSubject.cancelAll(1, "BTC"))
+ .then(() -> testSubject.placeOrder(3,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(100)))
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderCanceledEvent
+ && ((OrderCanceledEvent) orderEvent).canceledAmount().compareTo(BigDecimal.valueOf(100))
+ == 0)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void partialCancel() {
+ StepVerifier.create(testSubject.engineEvents().take(4))
+ .expectSubscription()
+ .then(() -> testSubject.placeOrder(1,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(100)))
+ .then(() -> testSubject.cancel(1,"BTC", BigDecimal.valueOf(75)))
+ .then(() -> testSubject.placeOrder(2,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(100)))
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderCanceledEvent
+ && ((OrderCanceledEvent) orderEvent).canceledAmount().compareTo(BigDecimal.valueOf(25))
+ == 0)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void ineffectiveCancel() {
+ StepVerifier.create(testSubject.engineEvents().take(2))
+ .expectSubscription()
+ .then(() -> testSubject.placeOrder(1,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(100)))
+ .then(() -> testSubject.cancel(1,"BTC", BigDecimal.valueOf(100)))
+ .then(() -> testSubject.cancel(1, "BTC",BigDecimal.valueOf(100)))
+ .then(() -> testSubject.cancel(1, "BTC",BigDecimal.valueOf(100)))
+ .then(() -> testSubject.placeOrder(2,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(100)))
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void cancelNonExisting() {
+ StepVerifier.create(testSubject.engineEvents().take(2))
+ .expectSubscription()
+ .then(() -> testSubject.placeOrder(1,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(100)))
+ .then(() -> testSubject.cancel(3,"BTC", BigDecimal.valueOf(50)))
+ .then(() -> testSubject.placeOrder(2,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(100)))
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void sameId() {
+ StepVerifier.create(testSubject.engineEvents().take(3))
+ .expectSubscription()
+ .then(() -> testSubject.placeOrder(1,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(100)))
+ .then(() -> testSubject.placeOrder(2,
+ "BTC",
+ Instant.MIN,
+ OrderType.SELL,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(100)))
+ .then(() -> testSubject.placeOrder(1,
+ "BTC",
+ Instant.MIN,
+ OrderType.BUY,
+ BigDecimal.valueOf(1000),
+ BigDecimal.valueOf(100)))
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderMatchedEvent)
+ .expectNextMatches(orderEvent -> orderEvent instanceof OrderPlacedEvent)
+ .expectComplete()
+ .verify();
+ }
+}
diff --git a/src/test/java/io/bux/matchingengine/integration/IntegrationTest.java b/src/test/java/io/bux/matchingengine/integration/IntegrationTest.java
new file mode 100644
index 0000000..d5a7b23
--- /dev/null
+++ b/src/test/java/io/bux/matchingengine/integration/IntegrationTest.java
@@ -0,0 +1,269 @@
+package io.bux.matchingengine.integration;
+
+import io.bux.matchingengine.MatchingEngineApplication;
+import io.bux.matchingengine.api.protobuf.OrderStatusResponse;
+import io.bux.matchingengine.api.protobuf.OrderType;
+import org.junit.*;
+import org.junit.runner.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.test.StepVerifier;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @author Stefan Dragisic
+ */
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = MatchingEngineApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+public class IntegrationTest {
+
+ private final Logger logger = LoggerFactory.getLogger(IntegrationTest.class);
+
+ private final String SIMPLE_SELL_ORDER = """
+ {
+ "asset": "BTC",
+ "price": 43251.00,
+ "amount": 1.0,
+ "direction": "SELL"
+ }
+ """.stripIndent();
+
+ private final String SIMPLE_BUY_ORDER = """
+ {
+ "asset": "BTC",
+ "price": 43252.00,
+ "amount": 0.25,
+ "direction": "BUY"
+ }
+ """.stripIndent();
+ private final String BTC_SELL_ORDER = """
+ {
+ "asset": "BTC",
+ "price": 40000.00,
+ "amount": 1.0,
+ "direction": "SELL"
+ }
+ """.stripIndent();
+ private final String BTC_BUY_ORDER = """
+ {
+ "asset": "BTC",
+ "price": 40000.00,
+ "amount": 1,
+ "direction": "BUY"
+ }
+ """.stripIndent();
+ private final String SOL_SELL_ORDER = """
+ {
+ "asset": "BTC",
+ "price": 40000.00,
+ "amount": 1.0,
+ "direction": "SELL"
+ }
+ """.stripIndent();
+ private final String SOL_BUY_ORDER = """
+ {
+ "asset": "BTC",
+ "price": 40000.00,
+ "amount": 1,
+ "direction": "BUY"
+ }
+ """.stripIndent();
+ @LocalServerPort
+ private int port;
+ private WebClient client;
+
+ @Before
+ public void setUp() {
+ client = WebClient.builder()
+ .baseUrl("http://localhost:" + port)
+ .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
+ .build();
+ }
+
+ @Test
+ public void simplePlaceAndGetOrder() {
+ AtomicLong id = new AtomicLong();
+ StepVerifier.create((client.post()
+ .uri("/orders"))
+ .body(Mono.just(SIMPLE_SELL_ORDER), String.class).header(HttpHeaders.CONTENT_TYPE,
+ MediaType.APPLICATION_JSON_VALUE)
+ .retrieve().bodyToMono(OrderStatusResponse.class)
+ .doOnNext(or -> id.set(or.getId())))
+ .expectNextMatches(response ->
+ response.getAsset().equals("BTC")
+ && response.getPrice() == 43251.00
+ && response.getAmount() == 1.0
+ && response.getDirection().equals(OrderType.SELL)
+ && response.getPendingAmount() == 1.0)
+ .verifyComplete();
+
+ StepVerifier.create((client.get()
+ .uri("/orders/" + id + "/"))
+ .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
+ .retrieve().bodyToMono(OrderStatusResponse.class))
+ .expectNextMatches(response ->
+ response.getAsset().equals("BTC")
+ && response.getPrice() == 43251.00
+ && response.getAmount() == 1.0
+ && response.getDirection().equals(OrderType.SELL)
+ && response.getPendingAmount() == 1.0)
+ .verifyComplete();
+ }
+
+ @Test
+ public void simpleCancelOrder() {
+ AtomicLong id = new AtomicLong();
+ StepVerifier.create((client.post()
+ .uri("/orders"))
+ .body(Mono.just(SIMPLE_SELL_ORDER), String.class).header(HttpHeaders.CONTENT_TYPE,
+ MediaType.APPLICATION_JSON_VALUE)
+ .retrieve().bodyToMono(OrderStatusResponse.class)
+ .doOnNext(or -> id.set(or.getId())))
+ .expectNextMatches(response ->
+ response.getAsset().equals("BTC")
+ && response.getPrice() == 43251.00
+ && response.getAmount() == 1.0
+ && response.getDirection().equals(OrderType.SELL)
+ && response.getPendingAmount() == 1.0)
+ .verifyComplete();
+
+ StepVerifier.create((client.post()
+ .uri("/orders/" + id + "/cancel"))
+ .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
+ .retrieve().bodyToMono(String.class))
+ .expectNextMatches(response -> response.equals("OK"))
+ .verifyComplete();
+ }
+
+ @Test
+ public void simpleTradeTest() {
+ AtomicLong id = new AtomicLong();
+ AtomicLong id2 = new AtomicLong();
+ StepVerifier.create((client.post()
+ .uri("/orders"))
+ .body(Mono.just(SIMPLE_SELL_ORDER), String.class)
+ .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
+ .retrieve().bodyToMono(OrderStatusResponse.class)
+ .doOnNext(or -> id.set(or.getId())))
+ .expectNextMatches(response ->
+ response.getAsset().equals("BTC")
+ && response.getPrice() == 43251.00
+ && response.getAmount() == 1.0
+ && response.getDirection().equals(OrderType.SELL)
+ && response.getPendingAmount() == 1.0)
+ .verifyComplete();
+
+ StepVerifier.create((client.post()
+ .uri("/orders"))
+ .body(Mono.just(SIMPLE_BUY_ORDER), String.class)
+ .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
+ .retrieve().bodyToMono(OrderStatusResponse.class)
+ .doOnNext(or -> id2.set(or.getId())))
+ .expectNextMatches(response ->
+ response.getAsset().equals("BTC")
+ && response.getPrice() == 43252.00
+ && response.getAmount() == 0.25
+ && response.getDirection().equals(OrderType.BUY))
+ .verifyComplete();
+
+ StepVerifier.create((client.get()
+ .uri("/orders/" + id + "/"))
+ .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
+ .retrieve().bodyToMono(OrderStatusResponse.class))
+ .expectNextMatches(response ->
+ response.getAsset().equals("BTC")
+ && response.getPrice() == 43251.00
+ && response.getAmount() == 1.0
+ && response.getDirection().equals(OrderType.SELL)
+ && response.getPendingAmount() == 0.75
+ && response.getTradesList().stream()
+ .anyMatch(t -> t.getOrderId() == id2.get()
+ && t.getAmount() == 0.25
+ && t.getPrice() == 43251.0
+ )
+ )
+ .verifyComplete();
+ }
+
+ @Test
+ public void stressTest() {
+ Mono btcSellOrderMono = WebClient.builder()
+ .baseUrl("http://localhost:" + port)
+ .defaultHeader(HttpHeaders.CONTENT_TYPE,
+ MediaType.APPLICATION_JSON_VALUE)
+ .build().post()
+ .uri("/orders")
+ .body(Mono.just(BTC_SELL_ORDER), String.class)
+ .header(HttpHeaders.CONTENT_TYPE,
+ MediaType.APPLICATION_JSON_VALUE)
+ .retrieve().bodyToMono(OrderStatusResponse.class).timeout(
+ Duration.ofSeconds(1));
+
+ Mono btcBuyOrderMono = WebClient.builder()
+ .baseUrl("http://localhost:" + port)
+ .defaultHeader(HttpHeaders.CONTENT_TYPE,
+ MediaType.APPLICATION_JSON_VALUE)
+ .build().post()
+ .uri("/orders")
+ .body(Mono.just(BTC_BUY_ORDER), String.class)
+ .header(HttpHeaders.CONTENT_TYPE,
+ MediaType.APPLICATION_JSON_VALUE)
+ .retrieve().bodyToMono(OrderStatusResponse.class)
+ .subscribeOn(Schedulers.boundedElastic()).timeout(
+ Duration.ofSeconds(1));
+
+ Mono solSellOrderMono = WebClient.builder()
+ .baseUrl("http://localhost:" + port)
+ .defaultHeader(HttpHeaders.CONTENT_TYPE,
+ MediaType.APPLICATION_JSON_VALUE)
+ .build().post()
+ .uri("/orders")
+ .body(Mono.just(SOL_SELL_ORDER), String.class)
+ .header(HttpHeaders.CONTENT_TYPE,
+ MediaType.APPLICATION_JSON_VALUE)
+ .retrieve().bodyToMono(OrderStatusResponse.class)
+ .subscribeOn(Schedulers.boundedElastic()).timeout(
+ Duration.ofSeconds(1));
+
+ Mono solBuyOrderMono = WebClient.builder()
+ .baseUrl("http://localhost:" + port)
+ .defaultHeader(HttpHeaders.CONTENT_TYPE,
+ MediaType.APPLICATION_JSON_VALUE)
+ .build().post()
+ .uri("/orders")
+ .body(Mono.just(SOL_BUY_ORDER), String.class)
+ .header(HttpHeaders.CONTENT_TYPE,
+ MediaType.APPLICATION_JSON_VALUE)
+ .retrieve().bodyToMono(OrderStatusResponse.class)
+ .subscribeOn(Schedulers.boundedElastic()).timeout(
+ Duration.ofSeconds(1));
+
+ Flux buySellAll = Flux.merge(btcBuyOrderMono,
+ solSellOrderMono,
+ btcSellOrderMono,
+ solBuyOrderMono);
+
+ //16 threads
+ //2 district asset
+ //2 operations per asset (BUY/SELL)
+ //executed 1000 times
+ //total 4000 operations
+ Duration duration = StepVerifier.create(Flux.range(0, 1000).flatMap(unused -> buySellAll, 16))
+ .expectNextCount(4000)
+ .verifyComplete();
+
+ logger.info("Stress test took: {} ms", duration.toMillis());
+ logger.info("Average: {} ms / end to end rest call", duration.toMillis() / 4000);
+ }
+}
\ No newline at end of file