From 3bf9ed7d9b207dc07171bdbe7aa9808c63e4ca52 Mon Sep 17 00:00:00 2001 From: Stefan <91stefan@gmail.com> Date: Sat, 25 Jun 2022 14:48:57 +0200 Subject: [PATCH] refactor and cleanup --- README.md | 48 ++++++++++--------- .../schananas/reactivestockmarket/Config.java | 28 +++++++++++ .../ReactiveStockMarketApplication.java | 10 ---- .../domain/BookAggregateRepository.java | 4 +- .../domain/bus/CommandBus.java | 4 +- .../domain/query/BookQueryRepository.java | 4 +- 6 files changed, 62 insertions(+), 36 deletions(-) create mode 100644 src/main/java/com/github/schananas/reactivestockmarket/Config.java diff --git a/README.md b/README.md index 0fe88d6..846385f 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ This project takes you through the design of simple reactive stock market applic - Matching engine implemented using Max-Heap and Min-Heap - Light custom Reactive CQRS framework - Lockless - light thread synchronisation and good scalability potential +- In-memory storage - in this demo all data is stored in memory without persistent storage - Application supports backpressure and event streaming ### Anatomy @@ -132,53 +133,54 @@ So how can we reduce complexity and improve performance for high load? In this p ![Reactive shared](img/reactive_shared.svg) -Using reactor pattern commands are de-multiplexed (see [CommandBus:80](src/main/java/com/github/schananas/reactivestockmarket/domain/bus/CommandBus.java)) to a single "flow" of execution. I'm using word flow instead of thread here, as threads can arbitrarily be changed in Project Reactor, but that does not matter as we model our flow in such way that we know which components can be only accessed sequentially and which concurrently. +Using reactor pattern commands are de-multiplexed (see [CommandBus:80](src/main/java/com/github/schananas/reactivestockmarket/domain/bus/CommandBus.java)) to a single "flow" of execution. Word flow is used instead of thread here, as threads can arbitrarily be changed in Project Reactor, but that doesn't matter as we model our flow in such way that we know which components can be only accessed sequentially and which concurrently. Then we place our components within this flow. Each component executes one intent/command at the time, like any synchronized component from blocking example would. Once command is executed, next is taken from flow and gets executed. There is also option to specify what is maximum time allowed to access component, preventing potential congestion. Now components can be single threaded without any concurrency protection complexity. Once all steps from flow have been executed, user is asynchronously notified with response. **Reactive with parallel execution** -So what if some components can be accessed in parallel? In case of this project, if two users are bidding for two different assets/instruments we can execute their orders in parallel as assets are two logically separated components. (see [Book aggregate](src/main/java/com/github/schananas/reactivestockmarket/domain/Book.java)) +So what if some components can be accessed in parallel? In case of this project, if two users are bidding for distinct assets/instruments we can execute their orders in parallel as assets are two logically separated components. (see [Book aggregate](src/main/java/com/github/schananas/reactivestockmarket/domain/Book.java)) In this case we just multiplex flow again and split it to two separate flows, each executing commands and orders for distinct assets. (see [CommandBus:51](src/main/java/com/github/schananas/reactivestockmarket/domain/bus/CommandBus.java)) ![Reactive non-shared](img/reactive_non_shared.svg) -**Real example** +**Reactive stock market** -Given this, this is what we would like to a achive. +Applying reactive pattern to our reactive stock market we get following flow: ![Real example](img/reactive_wanted.svg) -First step to de-multiplex user commands is not necessary as we multiplex right after. -But it's done to preserve order of commands and to put all commands into project reactor context in order deal easier with further parallelism and concurrency. -We multiplex pipeline for each aggregate id (instrument id). +First step is to de-multiplex user commands to preserve order of commands and to deal easier with further parallelism and concurrency. +Now we can multiplex pipeline for each aggregate id (instrument id), meaning we will execute commands for distinct assets in parallel. Now as we have parallel pipelines for distinct instruments, we remove most of the synchronisation from our components like Matching Engine. -Question arises, why are aggregate and query repository represented as non-shared components, when they are singletons which all pipelines can access concurrently? -Both components use ConcurrentHashMap (todo change implemenation) which in contract to SyrnoziedMap is not locking whole map on update. Instead ConcurrentHashMap is devided into segments and each segment maintains its own lock. Any thread that wants to enter into segment have to acquire that segments lock. -Number of segments is decided by the parameter called concurrency levl which is passed while instantiating ConcurrentHashMap.As we multiplexed our pipelines per instrument id, we know that each pipleline will access map segments sequentialy, therefore whole pipleline remains lockless. +Question arises, why are [aggregate](src/main/java/com/github/schananas/reactivestockmarket/domain/BookAggregateRepository.java) and [query](src/main/java/com/github/schananas/reactivestockmarket/domain/query/BookQueryRepository.java) repository represented as non-shared components, when they are singletons which all pipelines can access concurrently? +Both components use ConcurrentHashMap which in contract to SynchronizedMap is not locking whole map on update. Instead, ConcurrentHashMap is divided into segments and each segment maintains its own lock. Any thread that wants to enter into segment have to acquire that segments lock. +Number of segments is decided by the parameter called `Config::DEFAULT_CONCURRENCY_LEVEL` which is passed while instantiating ConcurrentHashMap. As we multiplexed our pipelines per instrument id, we know that each pipleline will access map segments sequentialy, therefore whole pipleline remains lockless. **How to scale?** -Vertical horizontal... +With reactive and lockless implementation scaling options become more apparent. +Vertical scaling by adding more memory and CPU cores would allow application to handle more assets in parallel and investing into CPU with good single thread performance would directly impact matching engine performance. +Horizontal scaling would require deploying multiple instances of this application as microservices so that they can run in parallel across multiple machines. Then highly traded instrument could be handled on dedicated machine while all other can be handled by one shared machine. -** Double vs BigDecimal ** -Because floats and doubles cannot accurately represent the base 10 multiples that we use for money. +### Double vs BigDecimal +Floats and doubles cannot accurately represent the base 10 multiples that we use for money. This issue isn't just for Java, it's for any programming language that uses base 2 floating-point types. -Bigdecimal pitfalls: -https://blogs.oracle.com/javamagazine/post/four-common-pitfalls-of-the-bigdecimal-class-and-how-to-avoid-them +This project uses [BigDecimal](https://docs.oracle.com/javase/8/docs/api/java/math/BigDecimal.html#:~:text=A%20BigDecimal%20consists%20of%20an,the%20negation%20of%20the%20scale.) for representing numbers. +The disadvantage of BigDecimal is that it's slower and there are [common pitfalls](https://blogs.oracle.com/javamagazine/post/four-common-pitfalls-of-the-bigdecimal-class-and-how-to-avoid-them) that developer should be aware of. -### Optimisation and improvements +### Where to go from here + +- **Embrace eventual consistency**: Synchronous systems rely on request/response semantics as in, we invoke a method or REST endpoint and expect a response. This project implements this approach due to original system requirements. + Better approach would be to subscribe to a dedicated SSE or WebSocket endpoint which will emit all the events once they have been materialized. After all UI is just one type of projection, so build it same as any other projection! + **_Further contributions of this project will aim to demonstrate this approach too._** +- **Going to production**: As you probably noticed this project stores all events in memory and its long way from being production ready. Sole purpose of this project is to educate. + It may fool you that writing your own CQRS Reactive framework is easy. There are many things that CQRS framework should be able to handle which are not implemented in this project, like snapshots, persistent storage support, upcasting, event processors that allows you to replay events etc... + If you are looking for production ready alternative, [Axon Framework](https://github.com/AxonFramework/AxonFramework) could be a good staring point. Axon Framework is feature rich framework that is available for years now witch established community and large number of extensions. It also supports variety of Event Stores and message brokers. If you need something more lightweight, another interesting reactive and function framework that you should take a look is [FModel](https://github.com/fraktalio/fmodel). -- Embrace eventual consistency -- Don't block while waiting for response -- REST should not return projection -- UI is also a different type of projection - build it using events! -- Synchronous systems rely on request/response semantics; as in, we invoke a method or RESTful endpoint and expect a response. -- The caller blocks until receiving a response. If the response is delayed or fails, this can lead to resource starvation due to accumulating timeouts. At the very minimum, any delay when integrating with third-party systems can cause noticeable latency in our services. -- ### Tests - [x] Unit tests diff --git a/src/main/java/com/github/schananas/reactivestockmarket/Config.java b/src/main/java/com/github/schananas/reactivestockmarket/Config.java new file mode 100644 index 0000000..521f8f0 --- /dev/null +++ b/src/main/java/com/github/schananas/reactivestockmarket/Config.java @@ -0,0 +1,28 @@ +package com.github.schananas.reactivestockmarket; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.converter.protobuf.ProtobufHttpMessageConverter; +import org.springframework.web.client.RestTemplate; + +import java.util.List; + +/** + * @author Stefan Dragisic + */ +@Configuration +public class Config { + + public static int DEFAULT_CONCURRENCY_LEVEL = 32; + + @Bean + RestTemplate restTemplate(ProtobufHttpMessageConverter hmc) { + return new RestTemplate(List.of(hmc)); + } + + @Bean + ProtobufHttpMessageConverter protobufHttpMessageConverter() { + return new ProtobufHttpMessageConverter(); + } + +} diff --git a/src/main/java/com/github/schananas/reactivestockmarket/ReactiveStockMarketApplication.java b/src/main/java/com/github/schananas/reactivestockmarket/ReactiveStockMarketApplication.java index 3682477..34204b2 100644 --- a/src/main/java/com/github/schananas/reactivestockmarket/ReactiveStockMarketApplication.java +++ b/src/main/java/com/github/schananas/reactivestockmarket/ReactiveStockMarketApplication.java @@ -11,16 +11,6 @@ import java.util.List; @SpringBootApplication public class ReactiveStockMarketApplication { - @Bean - RestTemplate restTemplate(ProtobufHttpMessageConverter hmc) { - return new RestTemplate(List.of(hmc)); - } - - @Bean - ProtobufHttpMessageConverter protobufHttpMessageConverter() { - return new ProtobufHttpMessageConverter(); - } - public static void main(String[] args) { SpringApplication.run(ReactiveStockMarketApplication.class, args); } diff --git a/src/main/java/com/github/schananas/reactivestockmarket/domain/BookAggregateRepository.java b/src/main/java/com/github/schananas/reactivestockmarket/domain/BookAggregateRepository.java index c2ecf2c..1054032 100644 --- a/src/main/java/com/github/schananas/reactivestockmarket/domain/BookAggregateRepository.java +++ b/src/main/java/com/github/schananas/reactivestockmarket/domain/BookAggregateRepository.java @@ -7,6 +7,8 @@ import reactor.core.publisher.Mono; import java.util.concurrent.ConcurrentHashMap; +import static com.github.schananas.reactivestockmarket.Config.DEFAULT_CONCURRENCY_LEVEL; + /** * Thread-safe implementation of {@link AggregateRepository} used to store Book aggregates * @@ -15,7 +17,7 @@ import java.util.concurrent.ConcurrentHashMap; @Component("aggregateRepository") public class BookAggregateRepository implements AggregateRepository { - private final ConcurrentHashMap aggregates = new ConcurrentHashMap<>(); + private final ConcurrentHashMap aggregates = new ConcurrentHashMap<>(32,0.75f,DEFAULT_CONCURRENCY_LEVEL); private final BookQueryRepository bookQueryRepository; public BookAggregateRepository(BookQueryRepository bookQueryRepository) { diff --git a/src/main/java/com/github/schananas/reactivestockmarket/domain/bus/CommandBus.java b/src/main/java/com/github/schananas/reactivestockmarket/domain/bus/CommandBus.java index 9cc7d16..c05a08a 100644 --- a/src/main/java/com/github/schananas/reactivestockmarket/domain/bus/CommandBus.java +++ b/src/main/java/com/github/schananas/reactivestockmarket/domain/bus/CommandBus.java @@ -14,6 +14,8 @@ import reactor.core.scheduler.Schedulers; import java.util.function.Consumer; import javax.annotation.PreDestroy; +import static com.github.schananas.reactivestockmarket.Config.DEFAULT_CONCURRENCY_LEVEL; + /** * Routes commands to corresponding aggregate. *

@@ -60,7 +62,7 @@ public class CommandBus { .doOnError(cmd::signalError) - ))) + )), DEFAULT_CONCURRENCY_LEVEL) .subscribe(); } diff --git a/src/main/java/com/github/schananas/reactivestockmarket/domain/query/BookQueryRepository.java b/src/main/java/com/github/schananas/reactivestockmarket/domain/query/BookQueryRepository.java index 4a725a3..d31210e 100644 --- a/src/main/java/com/github/schananas/reactivestockmarket/domain/query/BookQueryRepository.java +++ b/src/main/java/com/github/schananas/reactivestockmarket/domain/query/BookQueryRepository.java @@ -13,6 +13,8 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import static com.github.schananas.reactivestockmarket.Config.DEFAULT_CONCURRENCY_LEVEL; + /** * Thread-safe implementation of {@link QueryRepository} used to store order & book projections. * @@ -21,7 +23,7 @@ import java.util.concurrent.CopyOnWriteArrayList; @Component public class BookQueryRepository implements QueryRepository { - private final ConcurrentHashMap projection = new ConcurrentHashMap<>(); + private final ConcurrentHashMap projection = new ConcurrentHashMap<>(32,0.75f,DEFAULT_CONCURRENCY_LEVEL); /** * Returns current order projection