Isolation of blocking operation into the different thread poll

This commit is contained in:
Michal Zeman
2020-02-09 10:39:31 +01:00
parent 2072882524
commit c9e9434fcd
8 changed files with 94 additions and 41 deletions

View File

@@ -3,6 +3,7 @@ package com.mz.reactor.ddd.reactorddd.account.api;
import com.mz.reactor.ddd.common.components.http.HttpHandler;
import com.mz.reactor.ddd.reactorddd.account.api.model.*;
import com.mz.reactor.ddd.reactorddd.account.domain.AccountState;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
@@ -10,6 +11,7 @@ import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
@@ -20,9 +22,12 @@ public class AccountHandler implements HttpHandler {
private final AccountQuery accountQuery;
public AccountHandler(AccountApplicationService service, AccountQuery accountQuery) {
private final Scheduler scheduler;
public AccountHandler(AccountApplicationService service, AccountQuery accountQuery, @Qualifier("JsonDesScheduler") Scheduler scheduler) {
this.service = service;
this.accountQuery = accountQuery;
this.scheduler = scheduler;
}
public Mono<ServerResponse> getById(ServerRequest request) {
@@ -37,8 +42,7 @@ public class AccountHandler implements HttpHandler {
}
public Mono<ServerResponse> createAccount(ServerRequest request) {
return request
.bodyToMono(CreateAccountRequest.class)
return bodyToMono(request, CreateAccountRequest.class, scheduler)
.map(CreateAccountRequest::payload)
.flatMap(service::execute)
.map(CreateAccountResponse::from)
@@ -46,8 +50,7 @@ public class AccountHandler implements HttpHandler {
}
public Mono<ServerResponse> depositMoney(ServerRequest request) {
return request
.bodyToMono(DepositMoneyRequest.class)
return bodyToMono(request, DepositMoneyRequest.class, scheduler)
.map(DepositMoneyRequest::payload)
.flatMap(service::execute)
.map(DepositMoneyResponse::from)
@@ -55,8 +58,7 @@ public class AccountHandler implements HttpHandler {
}
public Mono<ServerResponse> withdrawMoney(ServerRequest request) {
return request
.bodyToMono(WithdrawMoneyRequest.class)
return bodyToMono(request, WithdrawMoneyRequest.class, scheduler)
.map(WithdrawMoneyRequest::payload)
.flatMap(service::execute)
.map(WithdrawMoneyResponse::from)

View File

@@ -2,14 +2,18 @@ package com.mz.reactor.ddd.reactorddd.application;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import reactor.core.publisher.Hooks;
import reactor.blockhound.BlockHound;
@SpringBootApplication
public class BankAccountApp {
static {
BlockHound.install();
}
public static void main(String[] args) {
Hooks.onOperatorDebug();
// Hooks.onOperatorDebug();
SpringApplication.run(BankAccountApp.class, args);
}

View File

@@ -1,6 +1,6 @@
package com.mz.reactor.ddd.reactorddd.application;
import com.mz.reactor.ddd.common.components.http.HttpErrorHandler;
import com.mz.reactor.ddd.common.components.http.HttpHandlerFunctions;
import com.mz.reactor.ddd.reactorddd.account.api.AccountHandler;
import com.mz.reactor.ddd.reactorddd.transaction.api.TransactionHandler;
import org.apache.commons.logging.Log;
@@ -13,14 +13,22 @@ import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.util.concurrent.Executors;
@Configuration
@ComponentScan(basePackages = {"com.mz.reactor.ddd.*"})
//@Import({AccountConfiguration.class, TransactionConfiguration.class})
public class BankAccountAppConfiguration {
private static final Log log = LogFactory.getLog(BankAccountAppConfiguration.class);
@Bean("JsonDesScheduler")
public Scheduler jsonDesScheduler() {
return Schedulers.fromExecutor(Executors.newFixedThreadPool(5));
}
@Bean
public RouterFunction<ServerResponse> statisticRoute(AccountHandler accountHandler, TransactionHandler transactionHandler) {
return RouterFunctions.route()
@@ -30,7 +38,7 @@ public class BankAccountAppConfiguration {
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(Mono.just("Tick"), String.class))
.onError(Throwable.class,
(throwable, serverRequest) -> HttpErrorHandler.FN.onError(throwable, serverRequest, error -> log.error("Error: ", error)))
(throwable, serverRequest) -> HttpHandlerFunctions.FN.onError(throwable, serverRequest, error -> log.error("Error: ", error)))
.build();
}

View File

@@ -40,8 +40,7 @@ public class TransactionHandler implements HttpHandler {
}
public Mono<ServerResponse> createTransaction(ServerRequest request) {
return request
.bodyToMono(CreateTransactionRequest.class)
return bodyToMono(request, CreateTransactionRequest.class)
.map(CreateTransactionRequest::payload)
.flatMap(service::execute)
.map(CreateTransactionResponse::from)

View File

@@ -4,6 +4,8 @@ buildscript {
springBootVersion = '2.1.7.RELEASE'
springDependencyMavagementVersion = '1.0.8.RELEASE'
reactorVersion = '3.3.0.RELEASE'
immutablesVersion = '2.7.5'
jacksonVersion = '2.10.1'
}
repositories {
@@ -32,6 +34,8 @@ allprojects {
maven {
url "https://plugins.gradle.org/m2/"
}
maven { url 'https://repo.spring.io/milestone' }
maven { url 'https://repo.spring.io/snapshot' }
}
dependencyManagement {
@@ -48,6 +52,7 @@ subprojects {
dependencies {
implementation group: 'com.google.guava', name: 'guava', version: '28.1-jre'
implementation 'io.projectreactor.tools:blockhound:1.0.0.RC1'
annotationProcessor 'org.immutables:value:2.7.5'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-engine'
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine"
@@ -55,7 +60,7 @@ subprojects {
}
test {
useJUnitPlatform{
useJUnitPlatform {
includeEngines 'junit-jupiter'
}
}
@@ -183,9 +188,9 @@ project(':bank-account:transaction-domain-api') {
project(':shared-dependencies') {
dependencies {
api 'org.immutables:value:2.7.5'
api 'com.fasterxml.jackson.core:jackson-databind:2.10.1'
api 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.10.1'
api "org.immutables:value:$immutablesVersion"
api "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
api "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion"
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.9'
}
}

View File

@@ -1,23 +0,0 @@
package com.mz.reactor.ddd.common.components.http;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import java.util.function.Consumer;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;
public enum HttpErrorHandler {
FN;
public <E extends Throwable> Mono<ServerResponse> onError(E e, ServerRequest req, Consumer<E> logger) {
logger.accept(e);
return Mono.just(ErrorMessage.builder().error(e.getMessage()).build())
.flatMap(error -> ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(fromObject(error)));
}
}

View File

@@ -2,8 +2,10 @@ package com.mz.reactor.ddd.common.components.http;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;
@@ -11,6 +13,16 @@ public interface HttpHandler {
RouterFunction<ServerResponse> route();
default <T> Mono<T> bodyToMono(ServerRequest request, Class<T> clazz, Scheduler scheduler) {
return request.bodyToMono(String.class)
.flatMap(HttpHandlerFunctions.FN.deserializeJsonString(clazz, scheduler));
}
default <T> Mono<T> bodyToMono(ServerRequest request, Class<T> clazz) {
return request.bodyToMono(String.class)
.flatMap(HttpHandlerFunctions.FN.deserializeJsonString(clazz));
}
default <T> Mono<ServerResponse> mapToResponse(T result) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON_UTF8)

View File

@@ -0,0 +1,46 @@
package com.mz.reactor.ddd.common.components.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import javax.annotation.Nonnull;
import java.util.function.Consumer;
import java.util.function.Function;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;
public enum HttpHandlerFunctions {
FN;
public <T> Function<String, Mono<T>> deserializeJsonString(@Nonnull Class<T> clazz, @Nonnull Scheduler scheduler) {
return value -> Mono.fromCallable(() -> {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new Jdk8Module());
return mapper.readValue(value, clazz);
}).subscribeOn(scheduler);
}
public <T> Function<String, Mono<T>> deserializeJsonString(@Nonnull Class<T> clazz) {
return value -> Mono.fromCallable(() -> {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new Jdk8Module());
return mapper.readValue(value, clazz);
}).subscribeOn(Schedulers.elastic());
}
public <E extends Throwable> Mono<ServerResponse> onError(E e, ServerRequest req, Consumer<E> logger) {
return Mono.fromCallable(() -> {
logger.accept(e);
return ErrorMessage.builder().error(e.getMessage()).build();
}).flatMap(error -> ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(fromObject(error)));
}
}