This commit is contained in:
Stefan
2022-06-17 18:06:16 +02:00
parent 332ee608cf
commit 9847bd99d5
6 changed files with 24 additions and 29 deletions

View File

@@ -2,11 +2,17 @@
### Introduction
- Reactive CQRS application with matching engine as it core
- Matching engine implemented using Max-Heap and Min-Heap
- Supports backpressure and event streaming
This project demonstrates reactive implementation of simple stock exchange platform.
Originally assigment is given to Senior Software Engineers as a technical code interview in stock/crypto exchange companies.
See original [system requirements](system_requirements.pdf)
### Reasoning
Takeaways of implementation:
- Spring Boot application with matching engine as it core
- Custom Reactive CQRS framework
- Matching engine implemented using Max-Heap and Min-Heap
- Application supports backpressure and event streaming
### Implementation
- **Matching engine**
+ Matching engine uses Max-Heap and Min-Heap
@@ -17,7 +23,6 @@
+ Sell tree - The collection of orders in the descending order, that is, lower sell prices have priority to be matched over higher.
+ Each state transition is the consequence of an event. Events are played sequentially and therefore engine is single-threaded. Thread synchronisation is handled outside of engine.
- **Reactive**
+ Asynchronous, event driven, non-blocking programming perfectly fits for given problem. We want to subscribe to engine updates instead of blocking the threads.
+ Business logic can be broken down into a pipeline of steps where each of the steps can be executed asynchronously
@@ -46,16 +51,6 @@
- [x] Integration tests
- [x] Load test (stress test)
**Coverage:**
| Classes | Lines |
|----|-----------|
| 100% | 96% |
### Bonus
- [x] Order cancellation
- [ ] Reactive Book UI
### How to run
Execute `mvn clean install` to build project and generate protobuf classes.

View File

@@ -17,10 +17,10 @@ public interface QueryRepository<T> {
Mono<Void> updateProjection(Event e);
/**
* Returns current order projection
* Returns materialized projection
*
* @param orderId - order identifier
* @param projectionId - order identifier
* @return - materialized projection
*/
Mono<T> getOrder(long orderId);
Mono<T> getProjection(long projectionId);
}

View File

@@ -35,7 +35,7 @@ public class BookAggregateRepository implements AggregateRepository<Book> {
return Mono.fromCallable(() -> aggregates.computeIfAbsent(aggregateId, (k) -> {
Book book = new Book(aggregateId);
//subscribe query projection for book events
book.aggregateEvents().flatMap(bookQueryRepository::updateProjection).subscribe();
book.aggregateEvents().concatMap(bookQueryRepository::updateProjection).subscribe();
return book;
}));
}

View File

@@ -30,7 +30,7 @@ public class BookQueryRepository implements QueryRepository<OrderEntry> {
* @return - materialized projection
*/
@Override
public Mono<OrderEntry> getOrder(long orderId) {
public Mono<OrderEntry> getProjection(long orderId) {
return Mono.fromCallable(() -> projection.get(orderId));
}

View File

@@ -65,7 +65,7 @@ public class MarketController {
/**
* Not used - POC
* Intended to UI or client applications to mantain their own projection
* Intended to UI or client applications to maintain their own projection
*
* @param asset
* @return streams all events from aggregate
@@ -84,7 +84,7 @@ public class MarketController {
*/
@GetMapping("/orders/{orderId}")
public Mono<OrderStatusResponse> getOrder(@PathVariable Long orderId) {
return bookQueryRepository.getOrder(orderId)
return bookQueryRepository.getProjection(orderId)
.map(this::toOrderStatus);
}
@@ -96,7 +96,7 @@ public class MarketController {
*/
@PostMapping("/orders/{orderId}/cancel")
public Mono<ResponseEntity<String>> cancelOrder(@PathVariable Long orderId) {
return bookQueryRepository.getOrder(orderId)
return bookQueryRepository.getProjection(orderId)
.flatMap(MarketController::validateOrderAmount)
.flatMap(this::sendCancelCommand)
.switchIfEmpty(Mono.error(new IllegalStateException(
@@ -114,7 +114,7 @@ public class MarketController {
}
private Mono<? extends OrderEntry> getOrderProjection(OrderAcceptedEvent ev) {
return bookQueryRepository.getOrder(ev.orderId())
return bookQueryRepository.getProjection(ev.orderId())
.repeatWhenEmpty(5, o -> o.delayElements(
Duration.ofMillis(50)));
}

View File

@@ -30,7 +30,7 @@ class BookQueryRepositoryTest {
OrderType.SELL,
BigDecimal.valueOf(10),
BigDecimal.valueOf(100)))
.then(testSubject.getOrder(1L)))
.then(testSubject.getProjection(1L)))
.expectNextMatches(orderEntry -> orderEntry.orderId() == 1L
&& orderEntry.entryTimestamp().equals(Instant.MIN)
&& orderEntry.direction() == OrderType.SELL
@@ -51,7 +51,7 @@ class BookQueryRepositoryTest {
43251.00),
BigDecimal.valueOf(
1.0)))
.then(testSubject.getOrder(0L)))
.then(testSubject.getProjection(0L)))
.expectNextMatches(orderEntry -> orderEntry.orderId() == 0L
&& orderEntry.entryTimestamp().equals(Instant.MIN)
&& orderEntry.direction() == OrderType.SELL
@@ -74,7 +74,7 @@ class BookQueryRepositoryTest {
BigDecimal.valueOf(
0.25)
))
.then(testSubject.getOrder(1L)))
.then(testSubject.getProjection(1L)))
.expectNextMatches(orderEntry -> orderEntry.orderId() == 1L
&& orderEntry.entryTimestamp().equals(Instant.MIN)
&& orderEntry.direction() == OrderType.BUY
@@ -103,7 +103,7 @@ class BookQueryRepositoryTest {
BigDecimal.valueOf(
0.65)
))
.then(testSubject.getOrder(0L)))
.then(testSubject.getProjection(0L)))
.expectNextMatches(orderEntry -> orderEntry.orderId() == 0L
&& orderEntry.entryTimestamp().equals(Instant.MIN)
&& orderEntry.direction() == OrderType.SELL
@@ -136,7 +136,7 @@ class BookQueryRepositoryTest {
BigDecimal.valueOf(
0.0)
))
.then(testSubject.getOrder(0L)))
.then(testSubject.getProjection(0L)))
.expectNextMatches(orderEntry -> orderEntry.orderId() == 0L
&& orderEntry.entryTimestamp().equals(Instant.MIN)
&& orderEntry.direction() == OrderType.SELL