http api for account

This commit is contained in:
Michal Zeman
2019-11-30 20:12:15 +01:00
parent 8b1a2196d3
commit 65381f7f50
53 changed files with 864 additions and 136 deletions

View File

@@ -0,0 +1 @@
description('account api module')

View File

@@ -0,0 +1,19 @@
package com.mz.reactor.ddd.reactorddd.account.api;
import com.mz.reactor.ddd.reactorddd.account.domain.command.CreateAccount;
import com.mz.reactor.ddd.reactorddd.account.domain.command.DepositMoney;
import com.mz.reactor.ddd.reactorddd.account.domain.command.WithdrawMoney;
import com.mz.reactor.ddd.reactorddd.account.domain.event.AccountCreated;
import com.mz.reactor.ddd.reactorddd.account.domain.event.MoneyDeposited;
import com.mz.reactor.ddd.reactorddd.account.domain.event.MoneyWithdrawn;
import reactor.core.publisher.Mono;
public interface AccountApplicationService {
Mono<AccountCreated> execute(CreateAccount cmd);
Mono<MoneyDeposited> execute(DepositMoney cmd);
Mono<MoneyWithdrawn> execute(WithdrawMoney cmd);
}

View File

@@ -0,0 +1,62 @@
package com.mz.reactor.ddd.reactorddd.account.api;
import com.mz.reactor.ddd.common.components.http.HttpHandler;
import com.mz.reactor.ddd.reactorddd.account.api.model.*;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
@Component
public class AccountHandler implements HttpHandler {
private final AccountApplicationService service;
public AccountHandler(AccountApplicationService service) {
this.service = service;
}
public Mono<ServerResponse> createAccount(ServerRequest request) {
return request
.bodyToMono(CreateAccountRequest.class)
.map(CreateAccountRequest::payload)
.flatMap(service::execute)
.map(CreateAccountResponse::from)
.flatMap(this::mapToResponse);
}
public Mono<ServerResponse> depositMoney(ServerRequest request) {
return request
.bodyToMono(DepositMoneyRequest.class)
.map(DepositMoneyRequest::payload)
.flatMap(service::execute)
.map(DepositMoneyResponse::from)
.flatMap(this::mapToResponse);
}
public Mono<ServerResponse> withdrawMoney(ServerRequest request) {
return request
.bodyToMono(WithdrawMoneyRequest.class)
.map(WithdrawMoneyRequest::payload)
.flatMap(service::execute)
.map(WithdrawMoneyResponse::from)
.flatMap(this::mapToResponse);
}
public RouterFunction<ServerResponse> route() {
var route = RouterFunctions
.route(POST("").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::createAccount)
.andRoute(PUT("/moneys/withdraw").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::withdrawMoney)
.andRoute(PUT("/moneys/deposit").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::withdrawMoney);
return RouterFunctions.route()
.nest(path("/accounts"), () -> route)
.build();
}
}

View File

@@ -0,0 +1,4 @@
package com.mz.reactor.ddd.reactorddd.account.api;
public interface AccountQuery {
}

View File

@@ -0,0 +1,18 @@
package com.mz.reactor.ddd.reactorddd.account.api.model;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.mz.reactor.ddd.reactorddd.account.domain.command.CreateAccount;
import org.immutables.value.Value;
@Value.Immutable
@JsonSerialize(as = ImmutableCreateAccountRequest.class)
@JsonDeserialize(as = ImmutableCreateAccountRequest.class)
public interface CreateAccountRequest {
CreateAccount payload();
static ImmutableCreateAccountRequest.Builder builder() {
return ImmutableCreateAccountRequest.builder();
}
}

View File

@@ -0,0 +1,22 @@
package com.mz.reactor.ddd.reactorddd.account.api.model;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.mz.reactor.ddd.reactorddd.account.domain.event.AccountCreated;
import org.immutables.value.Value;
@Value.Immutable
@JsonSerialize(as = ImmutableCreateAccountResponse.class)
@JsonDeserialize(as = ImmutableCreateAccountResponse.class)
public interface CreateAccountResponse {
AccountCreated payload();
static ImmutableCreateAccountResponse.Builder builder() {
return ImmutableCreateAccountResponse.builder();
}
static CreateAccountResponse from(AccountCreated payload) {
return builder().payload(payload).build();
}
}

View File

@@ -0,0 +1,18 @@
package com.mz.reactor.ddd.reactorddd.account.api.model;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.mz.reactor.ddd.reactorddd.account.domain.command.DepositMoney;
import org.immutables.value.Value;
@Value.Immutable
@JsonSerialize(as = ImmutableDepositMoneyRequest.class)
@JsonDeserialize(as = ImmutableDepositMoneyRequest.class)
public interface DepositMoneyRequest {
DepositMoney payload();
static ImmutableDepositMoneyRequest.Builder builder() {
return ImmutableDepositMoneyRequest.builder();
}
}

View File

@@ -0,0 +1,22 @@
package com.mz.reactor.ddd.reactorddd.account.api.model;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.mz.reactor.ddd.reactorddd.account.domain.event.MoneyDeposited;
import org.immutables.value.Value;
@Value.Immutable
@JsonSerialize(as = ImmutableDepositMoneyResponse.class)
@JsonDeserialize(as = ImmutableDepositMoneyResponse.class)
public interface DepositMoneyResponse {
MoneyDeposited payload();
static ImmutableDepositMoneyResponse.Builder builder() {
return ImmutableDepositMoneyResponse.builder();
}
static DepositMoneyResponse from(MoneyDeposited payload) {
return builder().payload(payload).build();
}
}

View File

@@ -0,0 +1,18 @@
package com.mz.reactor.ddd.reactorddd.account.api.model;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.mz.reactor.ddd.reactorddd.account.domain.command.WithdrawMoney;
import org.immutables.value.Value;
@Value.Immutable
@JsonSerialize(as = ImmutableWithdrawMoneyRequest.class)
@JsonDeserialize(as = ImmutableWithdrawMoneyRequest.class)
public interface WithdrawMoneyRequest {
WithdrawMoney payload();
static ImmutableWithdrawMoneyRequest.Builder builder() {
return ImmutableWithdrawMoneyRequest.builder();
}
}

View File

@@ -0,0 +1,22 @@
package com.mz.reactor.ddd.reactorddd.account.api.model;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.mz.reactor.ddd.reactorddd.account.domain.event.MoneyWithdrawn;
import org.immutables.value.Value;
@Value.Immutable
@JsonSerialize(as = ImmutableWithdrawMoneyResponse.class)
@JsonDeserialize(as = ImmutableWithdrawMoneyResponse.class)
public interface WithdrawMoneyResponse {
MoneyWithdrawn payload();
static ImmutableWithdrawMoneyResponse.Builder builder() {
return ImmutableWithdrawMoneyResponse.builder();
}
static WithdrawMoneyResponse from(MoneyWithdrawn moneyWithdrawn) {
return builder().payload(moneyWithdrawn).build();
}
}

View File

@@ -1,10 +1,14 @@
package com.mz.reactor.ddd.reactorddd.account.domain.command; package com.mz.reactor.ddd.reactorddd.account.domain.command;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.immutables.value.Value; import org.immutables.value.Value;
import java.math.BigDecimal; import java.math.BigDecimal;
@Value.Immutable @Value.Immutable
@JsonSerialize(as = ImmutableCreateAccount.class)
@JsonDeserialize(as = ImmutableCreateAccount.class)
public interface CreateAccount extends AccountCommand { public interface CreateAccount extends AccountCommand {
BigDecimal balance(); BigDecimal balance();

View File

@@ -1,10 +1,14 @@
package com.mz.reactor.ddd.reactorddd.account.domain.command; package com.mz.reactor.ddd.reactorddd.account.domain.command;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.immutables.value.Value; import org.immutables.value.Value;
import java.math.BigDecimal; import java.math.BigDecimal;
@Value.Immutable @Value.Immutable
@JsonSerialize(as = ImmutableDepositMoney.class)
@JsonDeserialize(as = ImmutableDepositMoney.class)
public interface DepositMoney extends AccountCommand { public interface DepositMoney extends AccountCommand {
BigDecimal amount(); BigDecimal amount();

View File

@@ -1,10 +1,15 @@
package com.mz.reactor.ddd.reactorddd.account.domain.command; package com.mz.reactor.ddd.reactorddd.account.domain.command;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.mz.reactor.ddd.reactorddd.account.domain.event.ImmutableMoneyWithdrawn;
import org.immutables.value.Value; import org.immutables.value.Value;
import java.math.BigDecimal; import java.math.BigDecimal;
@Value.Immutable @Value.Immutable
@JsonSerialize(as = ImmutableWithdrawMoney.class)
@JsonDeserialize(as = ImmutableWithdrawMoney.class)
public interface WithdrawMoney extends AccountCommand { public interface WithdrawMoney extends AccountCommand {
BigDecimal amount(); BigDecimal amount();

View File

@@ -1,11 +1,15 @@
package com.mz.reactor.ddd.reactorddd.account.domain.event; package com.mz.reactor.ddd.reactorddd.account.domain.event;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.mz.reactor.ddd.reactorddd.account.domain.command.CreateAccount; import com.mz.reactor.ddd.reactorddd.account.domain.command.CreateAccount;
import org.immutables.value.Value; import org.immutables.value.Value;
import java.math.BigDecimal; import java.math.BigDecimal;
@Value.Immutable @Value.Immutable
@JsonSerialize(as = ImmutableAccountCreated.class)
@JsonDeserialize(as = ImmutableAccountCreated.class)
public interface AccountCreated extends AccountEvent { public interface AccountCreated extends AccountEvent {
BigDecimal balance(); BigDecimal balance();

View File

@@ -1,10 +1,14 @@
package com.mz.reactor.ddd.reactorddd.account.domain.event; package com.mz.reactor.ddd.reactorddd.account.domain.event;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.immutables.value.Value; import org.immutables.value.Value;
import java.math.BigDecimal; import java.math.BigDecimal;
@Value.Immutable @Value.Immutable
@JsonSerialize(as = ImmutableMoneyDeposited.class)
@JsonDeserialize(as = ImmutableMoneyDeposited.class)
public interface MoneyDeposited extends AccountEvent { public interface MoneyDeposited extends AccountEvent {
BigDecimal amount(); BigDecimal amount();

View File

@@ -1,11 +1,15 @@
package com.mz.reactor.ddd.reactorddd.account.domain.event; package com.mz.reactor.ddd.reactorddd.account.domain.event;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.mz.reactor.ddd.reactorddd.account.domain.command.WithdrawMoney; import com.mz.reactor.ddd.reactorddd.account.domain.command.WithdrawMoney;
import org.immutables.value.Value; import org.immutables.value.Value;
import java.math.BigDecimal; import java.math.BigDecimal;
@Value.Immutable @Value.Immutable
@JsonSerialize(as = ImmutableMoneyWithdrawn.class)
@JsonDeserialize(as = ImmutableMoneyWithdrawn.class)
public interface MoneyWithdrawn extends AccountEvent { public interface MoneyWithdrawn extends AccountEvent {
BigDecimal amount(); BigDecimal amount();

View File

@@ -0,0 +1 @@
description('account implementation of infrastructure layer')

View File

@@ -0,0 +1,46 @@
package com.mz.reactor.ddd.reactorddd.account.impl;
import com.mz.reactor.ddd.common.components.bus.ApplicationMessageBus;
import com.mz.reactor.ddd.reactorddd.account.api.AccountApplicationService;
import com.mz.reactor.ddd.reactorddd.account.domain.AccountAggregate;
import com.mz.reactor.ddd.reactorddd.account.domain.AccountState;
import com.mz.reactor.ddd.reactorddd.account.domain.command.AccountCommand;
import com.mz.reactor.ddd.reactorddd.account.domain.command.CreateAccount;
import com.mz.reactor.ddd.reactorddd.account.domain.command.DepositMoney;
import com.mz.reactor.ddd.reactorddd.account.domain.command.WithdrawMoney;
import com.mz.reactor.ddd.reactorddd.account.domain.event.AccountCreated;
import com.mz.reactor.ddd.reactorddd.account.domain.event.AccountEvent;
import com.mz.reactor.ddd.reactorddd.account.domain.event.MoneyDeposited;
import com.mz.reactor.ddd.reactorddd.account.domain.event.MoneyWithdrawn;
import com.mz.reactor.ddd.reactorddd.persistance.aggregate.AggregateFacade;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
public class AccountApplicationServiceImpl implements AccountApplicationService {
private final AggregateFacade<AccountAggregate, AccountCommand, AccountEvent, AccountState> aggregateFacade;
public AccountApplicationServiceImpl(
ApplicationMessageBus bus,
@Qualifier("accountAggregateFacade") AggregateFacade<AccountAggregate, AccountCommand, AccountEvent, AccountState> aggregateFacade
) {
this.aggregateFacade = aggregateFacade;
}
@Override
public Mono<AccountCreated> execute(CreateAccount cmd) {
return this.aggregateFacade.executeReturnEvent(cmd, cmd.aggregateId()).cast(AccountCreated.class);
}
@Override
public Mono<MoneyDeposited> execute(DepositMoney cmd) {
return this.aggregateFacade.executeReturnEvent(cmd, cmd.aggregateId()).cast(MoneyDeposited.class);
}
@Override
public Mono<MoneyWithdrawn> execute(WithdrawMoney cmd) {
return this.aggregateFacade.executeReturnEvent(cmd, cmd.aggregateId()).cast(MoneyWithdrawn.class);
}
}

View File

@@ -0,0 +1,46 @@
package com.mz.reactor.ddd.reactorddd.account.wiring;
import com.mz.reactor.ddd.common.api.valueobject.Id;
import com.mz.reactor.ddd.common.components.bus.ApplicationMessageBus;
import com.mz.reactor.ddd.common.components.bus.impl.ApplicationMessageBusImpl;
import com.mz.reactor.ddd.reactorddd.account.domain.AccountAggregate;
import com.mz.reactor.ddd.reactorddd.account.domain.AccountCommandHandler;
import com.mz.reactor.ddd.reactorddd.account.domain.AccountEventApplier;
import com.mz.reactor.ddd.reactorddd.account.domain.AccountState;
import com.mz.reactor.ddd.reactorddd.account.domain.command.AccountCommand;
import com.mz.reactor.ddd.reactorddd.account.domain.event.AccountEvent;
import com.mz.reactor.ddd.reactorddd.persistance.aggregate.AggregateFacade;
import com.mz.reactor.ddd.reactorddd.persistance.aggregate.AggregateRepository;
import com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl.AggregateFacadeImpl;
import com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl.AggregateRepositoryImpl;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import java.util.function.Function;
@Configuration
public class AccountConfiguration {
public static final String ACCOUNT_AGGREGATE_REPOSITORY = "accountAggregateRepository";
private final AccountEventApplier accountEventApplier = new AccountEventApplier();
private final AccountCommandHandler accountCommandHandler = new AccountCommandHandler();
private final Function<Id, AccountAggregate> aggregateFactory = id -> new AccountAggregate(id.toString());
private final Function<AccountAggregate, AccountState> stateFactory = AccountAggregate::getState;
@Bean(ACCOUNT_AGGREGATE_REPOSITORY)
public AggregateRepository<AccountAggregate, AccountCommand, AccountEvent, AccountState> getAggregateRepository() {
return new AggregateRepositoryImpl<>(accountCommandHandler, accountEventApplier, aggregateFactory, stateFactory);
}
@Bean("accountAggregateFacade")
public AggregateFacade<AccountAggregate, AccountCommand, AccountEvent, AccountState> getAggregateFacade(
@Qualifier(ACCOUNT_AGGREGATE_REPOSITORY) AggregateRepository aggregateRepository,
ApplicationMessageBus bus
) {
return new AggregateFacadeImpl<>(aggregateRepository, bus::publishMessage, s -> {});
}
}

View File

@@ -0,0 +1,31 @@
package com.mz.reactor.ddd.reactorddd.application;
import com.mz.reactor.ddd.common.components.http.HttpErrorHandler;
import com.mz.reactor.ddd.reactorddd.account.api.AccountHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
@Configuration
@ComponentScan(basePackages = {"com.mz.reactor.ddd.*"})
@Import({com.mz.reactor.ddd.reactorddd.account.wiring.AccountConfiguration.class})
public class BankAccountAppConfiguration {
@Bean
public RouterFunction<ServerResponse> statisticRoute(AccountHandler handler) {
return RouterFunctions.route()
.add(handler.route())
.GET("/health", req -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(Mono.just("Tick"), String.class))
.onError(Throwable.class, HttpErrorHandler.FN::onError)
.build();
}
}

View File

@@ -79,7 +79,11 @@ project(':bank-account:bank-account-application') {
dependencies { dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux' implementation 'org.springframework.boot:spring-boot-starter-webflux'
// https://mvnrepository.com/artifact/io.projectreactor/reactor-core implementation project(':common-api')
implementation project(':common-components')
implementation project(':common-persistence')
implementation project(':bank-account:account-impl')
implementation project(':bank-account:account-api')
implementation project(':bank-account:account-domain') implementation project(':bank-account:account-domain')
implementation project(':bank-account:transaction-domain') implementation project(':bank-account:transaction-domain')
testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.boot:spring-boot-starter-test'
@@ -100,6 +104,27 @@ project(':bank-account:account-domain-api') {
} }
} }
project(':bank-account:account-api') {
dependencies {
api project(':common-api')
api project(':common-components')
api project(':shared-spring-dependecies')
api project(':bank-account:account-domain-api')
}
}
project(':bank-account:account-impl') {
dependencies {
api project(':common-api')
api project(':common-components')
api project(':common-persistence')
api project(':shared-spring-dependecies')
implementation project(':bank-account:account-domain-api')
implementation project(':bank-account:account-api')
implementation project(':bank-account:account-domain')
}
}
project(':bank-account:transaction-domain') { project(':bank-account:transaction-domain') {
dependencies { dependencies {
api project(':common-api') api project(':common-api')
@@ -116,11 +141,23 @@ project(':bank-account:transaction-domain-api') {
project(':shared-dependencies') { project(':shared-dependencies') {
dependencies { dependencies {
api 'org.immutables:value:2.7.5' api 'org.immutables:value:2.7.5'
api 'com.fasterxml.jackson.core:jackson-databind:2.9.8'
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.9' api group: 'org.apache.commons', name: 'commons-lang3', version: '3.9'
} }
} }
project(':common-persistance') { project(':shared-spring-dependecies') {
apply plugin: 'io.spring.dependency-management'
dependencies {
api group: 'io.projectreactor', name: 'reactor-core'
api 'org.springframework:spring-context'
api 'org.springframework:spring-webflux'
api 'org.springframework:spring-core'
}
}
project(':common-persistence') {
dependencies { dependencies {
api project(':common-api') api project(':common-api')
@@ -134,8 +171,7 @@ project(':common-components') {
dependencies { dependencies {
api project(':common-api') api project(':common-api')
implementation group: 'io.projectreactor', name: 'reactor-core' api project(':shared-spring-dependecies')
implementation 'org.springframework:spring-context'
testImplementation 'io.projectreactor:reactor-test' testImplementation 'io.projectreactor:reactor-test'
} }
} }

View File

@@ -1,8 +1,10 @@
package com.mz.reactor.ddd.common.api.command; package com.mz.reactor.ddd.common.api.command;
import com.mz.reactor.ddd.common.api.event.DomainEvent;
@FunctionalInterface @FunctionalInterface
public interface CommandHandler<A, C extends Command> { public interface CommandHandler<A, C extends Command> {
CommandResult execute(A aggregate, C command); <E extends DomainEvent> CommandResult<E> execute(A aggregate, C command);
} }

View File

@@ -0,0 +1,15 @@
package com.mz.reactor.ddd.common.api.command;
import org.immutables.value.Value;
@Value.Immutable
public interface CommandResponse<S> {
CommandResult result();
S state();
static ImmutableCommandResponse.Builder builder() {
return ImmutableCommandResponse.builder();
}
}

View File

@@ -8,7 +8,7 @@ import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@Value.Immutable @Value.Immutable
public interface CommandResult { public interface CommandResult<E extends DomainEvent> {
enum StatusCode { enum StatusCode {
OK, OK,
@@ -21,7 +21,7 @@ public interface CommandResult {
StatusCode statusCode(); StatusCode statusCode();
List<DomainEvent> events(); List<E> events();
Optional<RuntimeException> error(); Optional<RuntimeException> error();
@@ -29,18 +29,18 @@ public interface CommandResult {
return ImmutableCommandResult.builder(); return ImmutableCommandResult.builder();
} }
static CommandResult fromError(RuntimeException error, DomainEvent event, Command command) { static <D extends DomainEvent> CommandResult<D> fromError(RuntimeException error, D event, Command command) {
return builder() return builder()
.commandId(Optional.ofNullable(command) .commandId(Optional.ofNullable(command)
.map(Command::commandId) .map(Command::commandId)
.orElseGet(() -> UUID.randomUUID().toString())) .orElseGet(() -> UUID.randomUUID().toString()))
.statusCode(StatusCode.BAD_COMMAND) .statusCode(StatusCode.BAD_COMMAND)
.events(Optional.ofNullable(event).map(List::of).orElseGet(() -> List.of())) .events(Optional.ofNullable(event).map(List::of).orElseGet(List::of))
.error(error) .error(error)
.build(); .build();
} }
static CommandResult badCommand(Command cmd) { static <D extends DomainEvent> CommandResult<D> badCommand(Command cmd) {
return builder() return builder()
.commandId(Optional.ofNullable(cmd) .commandId(Optional.ofNullable(cmd)
.map(Command::commandId) .map(Command::commandId)

View File

@@ -0,0 +1,17 @@
package com.mz.reactor.ddd.common.api.error;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.immutables.value.Value;
@Value.Immutable
@JsonSerialize(as = ImmutableErrorMessage.class)
@JsonDeserialize(as = ImmutableErrorMessage.class)
public interface ErrorMessage {
String error();
static ImmutableErrorMessage.Builder builder() {
return ImmutableErrorMessage.builder();
}
}

View File

@@ -1,7 +1,7 @@
package com.mz.reactor.ddd.common.api.event; package com.mz.reactor.ddd.common.api.event;
@FunctionalInterface @FunctionalInterface
public interface EventApplier<A, E> { public interface EventApplier<A, E extends DomainEvent> {
A apply(A aggregate, E event); A apply(A aggregate, E event);

View File

@@ -0,0 +1,5 @@
package com.mz.reactor.ddd.common.api.view;
public interface DomainView {
String id();
}

View File

@@ -1,4 +1,4 @@
package com.mz.reactor.ddd.common.components; package com.mz.reactor.ddd.common.components.bus;
import com.mz.reactor.ddd.common.api.Message; import com.mz.reactor.ddd.common.api.Message;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;

View File

@@ -1,8 +1,9 @@
package com.mz.reactor.ddd.common.components.impl; package com.mz.reactor.ddd.common.components.bus.impl;
import com.mz.reactor.ddd.common.api.Message; import com.mz.reactor.ddd.common.api.Message;
import com.mz.reactor.ddd.common.components.ApplicationMessageBus; import com.mz.reactor.ddd.common.components.bus.ApplicationMessageBus;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink; import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor; import reactor.core.publisher.ReplayProcessor;
@@ -10,7 +11,7 @@ import reactor.core.scheduler.Schedulers;
import java.util.Optional; import java.util.Optional;
@Component @Service
public class ApplicationMessageBusImpl implements ApplicationMessageBus { public class ApplicationMessageBusImpl implements ApplicationMessageBus {
private final ReplayProcessor<Message> messages = ReplayProcessor.create(1); private final ReplayProcessor<Message> messages = ReplayProcessor.create(1);

View File

@@ -0,0 +1,20 @@
package com.mz.reactor.ddd.common.components.http;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.immutables.value.Value;
/**
* Created by zemi on 02/10/2018.
*/
@Value.Immutable
@JsonSerialize(as = ImmutableErrorMessage.class)
@JsonDeserialize(as = ImmutableErrorMessage.class)
public interface ErrorMessage {
String error();
static ImmutableErrorMessage.Builder builder() {
return ImmutableErrorMessage.builder();
}
}

View File

@@ -0,0 +1,20 @@
package com.mz.reactor.ddd.common.components.http;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;
public enum HttpErrorHandler {
FN;
public <E extends Throwable> Mono<ServerResponse> onError(E e, ServerRequest req) {
return Mono.just(ErrorMessage.builder().error(e.getMessage()).build())
.flatMap(error -> ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(fromObject(error)));
}
}

View File

@@ -0,0 +1,16 @@
package com.mz.reactor.ddd.common.components.http;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;
public interface HttpHandler {
default <T> Mono<ServerResponse> mapToResponse(T result) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON_UTF8).body(fromObject(result));
}
}

View File

@@ -1,16 +0,0 @@
package com.mz.reactor.ddd.reactorddd.persistance.view;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.function.Predicate;
public interface ViewRepository<S> {
Mono<S> findBy(Predicate<S> query);
Mono<List<S>> findAllByList(Predicate<S> query);
abstract Flux<S> findAllBy(Predicate<S> query);
}

View File

@@ -1,50 +0,0 @@
package com.mz.reactor.ddd.reactorddd.persistance.view;
import reactor.core.publisher.*;
import reactor.core.scheduler.Schedulers;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
public class ViewRepositoryImpl<S> implements ViewRepository<S> {
private Set<S> storage = new LinkedHashSet<>();
UnicastProcessor<S> storageProcessor = UnicastProcessor.create();
FluxSink<S> sink = storageProcessor.sink();
public ViewRepositoryImpl() {
storageProcessor
.subscribeOn(Schedulers.single())
.subscribe(view -> storage.add(view));
}
public void addView(S view) {
sink.next(view);
}
@Override
public Mono<S> findBy(Predicate<S> query) {
return Mono.just(query)
.flatMap(q -> storage.stream()
.filter(q)
.findFirst()
.map(Mono::just)
.orElseGet(Mono::empty)
);
}
@Override
public Mono<List<S>> findAllByList(Predicate<S> query) {
return findAllBy(query).collectList();
}
@Override
public Flux<S> findAllBy(Predicate<S> query) {
return Flux.fromStream(storage.stream()
.filter(query));
}
}

View File

@@ -2,16 +2,17 @@ package com.mz.reactor.ddd.reactorddd.persistance.aggregate;
import com.mz.reactor.ddd.common.api.command.Command; import com.mz.reactor.ddd.common.api.command.Command;
import com.mz.reactor.ddd.common.api.command.CommandResult; import com.mz.reactor.ddd.common.api.command.CommandResult;
import com.mz.reactor.ddd.common.api.event.DomainEvent;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.function.Function; import java.util.function.Function;
public interface AggregateActor<A, C extends Command> { public interface AggregateActor<A, C extends Command, E extends DomainEvent> {
<S> Mono<S> getState(Function<A, S> stateFactory); <S> Mono<S> getState(Function<A, S> stateFactory);
void onDestroy(); void onDestroy();
Mono<CommandResult> execute(C cmd); Mono<CommandResult<E>> execute(C cmd);
} }

View File

@@ -0,0 +1,12 @@
package com.mz.reactor.ddd.reactorddd.persistance.aggregate;
import com.mz.reactor.ddd.common.api.command.Command;
import com.mz.reactor.ddd.common.api.event.DomainEvent;
import reactor.core.publisher.Mono;
public interface AggregateFacade<A, C extends Command, E extends DomainEvent,S> {
Mono<S> execute(C command, String aggregateID);
Mono<E> executeReturnEvent(C command, String aggregateID);
}

View File

@@ -2,12 +2,13 @@ package com.mz.reactor.ddd.reactorddd.persistance.aggregate;
import com.mz.reactor.ddd.common.api.command.Command; import com.mz.reactor.ddd.common.api.command.Command;
import com.mz.reactor.ddd.common.api.command.CommandResult; import com.mz.reactor.ddd.common.api.command.CommandResult;
import com.mz.reactor.ddd.common.api.event.DomainEvent;
import com.mz.reactor.ddd.common.api.valueobject.Id; import com.mz.reactor.ddd.common.api.valueobject.Id;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
public interface AggregateRepository<A, C extends Command, S> { public interface AggregateRepository<A, C extends Command, E extends DomainEvent,S> {
Mono<CommandResult> execute(C cmd, Id aggregateId); Mono<CommandResult<E>> execute(C cmd, Id aggregateId);
Mono<S> findById(Id id); Mono<S> findById(Id id);
} }

View File

@@ -19,15 +19,15 @@ import java.util.Objects;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Function; import java.util.function.Function;
public class AggregateActorImpl<A, C extends Command> implements AggregateActor<A, C> { public class AggregateActorImpl<A, C extends Command, E extends DomainEvent> implements AggregateActor<A, C, E> {
private final Id id; private final Id id;
private final CommandHandler<A, C> commandHandler; private final CommandHandler<A, C> commandHandler;
private final EventApplier<A, DomainEvent> eventApplier; private final EventApplier<A, E> eventApplier;
private final BiFunction<Id, List<DomainEvent>, List<DomainEvent>> persistAll; private final BiFunction<Id, List<E>, List<E>> persistAll;
private A aggregate; private A aggregate;
@@ -35,17 +35,17 @@ public class AggregateActorImpl<A, C extends Command> implements AggregateActor<
private final FluxSink<C> commandSink = commandProcessor.sink(); private final FluxSink<C> commandSink = commandProcessor.sink();
private final ReplayProcessor<CommandResult> commandResultReplayProcessor = ReplayProcessor.create(); private final ReplayProcessor<CommandResult<E>> commandResultReplayProcessor = ReplayProcessor.create();
private final FluxSink<CommandResult> commandResultSink = commandResultReplayProcessor.sink(); private final FluxSink<CommandResult<E>> commandResultSink = commandResultReplayProcessor.sink();
public AggregateActorImpl( public AggregateActorImpl(
Id id, Id id,
CommandHandler<A, C> commandHandler, CommandHandler<A, C> commandHandler,
EventApplier<A, DomainEvent> eventApplier, EventApplier<A, E> eventApplier,
Function<Id, A> aggregateFactory, Function<Id, A> aggregateFactory,
List<DomainEvent> domainEvents, List<E> domainEvents,
BiFunction<Id, List<DomainEvent>, List<DomainEvent>> persistAll BiFunction<Id, List<E>, List<E>> persistAll
) { ) {
this.id = id; this.id = id;
this.commandHandler = commandHandler; this.commandHandler = commandHandler;
@@ -64,18 +64,18 @@ public class AggregateActorImpl<A, C extends Command> implements AggregateActor<
} }
private void handleCommand(C cmd) { private void handleCommand(C cmd) {
var commandResult = commandHandler.execute(this.aggregate, cmd); var commandResult = commandHandler.<E>execute(this.aggregate, cmd);
this.aggregate = persistAll this.aggregate = (A) persistAll
.andThen(events -> events.stream() .andThen(events -> events.stream()
.reduce(this.aggregate, eventApplier::apply, (a1, a2) -> a2)) .reduce(this.aggregate, eventApplier::apply, (a1, a2) -> a2))
.apply(id, commandResult.events()); .apply(id, commandResult.events());
commandResultSink.next(commandResult); commandResultSink.next(commandResult);
} }
public Mono<CommandResult> execute(C cmd) { @Override
Mono<CommandResult> result = commandResultReplayProcessor.publishOn(Schedulers.elastic()) public Mono<CommandResult<E>> execute(C cmd) {
Mono<CommandResult<E>> result = commandResultReplayProcessor.publishOn(Schedulers.elastic())
.filter(r -> r.commandId().equals(cmd.commandId())) .filter(r -> r.commandId().equals(cmd.commandId()))
.cast(CommandResult.class)
.next(); .next();
commandSink.next(cmd); commandSink.next(cmd);
return result.timeout(Duration.ofSeconds(10)); return result.timeout(Duration.ofSeconds(10));
@@ -100,7 +100,7 @@ public class AggregateActorImpl<A, C extends Command> implements AggregateActor<
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
AggregateActorImpl that = (AggregateActorImpl) o; var that = (AggregateActorImpl) o;
return id.equals(that.id); return id.equals(that.id);
} }

View File

@@ -0,0 +1,60 @@
package com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl;
import com.mz.reactor.ddd.common.api.Message;
import com.mz.reactor.ddd.common.api.command.Command;
import com.mz.reactor.ddd.common.api.command.CommandResult;
import com.mz.reactor.ddd.common.api.event.DomainEvent;
import com.mz.reactor.ddd.common.api.valueobject.Id;
import com.mz.reactor.ddd.reactorddd.persistance.aggregate.AggregateFacade;
import com.mz.reactor.ddd.reactorddd.persistance.aggregate.AggregateRepository;
import reactor.core.publisher.Mono;
import java.util.function.Consumer;
import java.util.function.Function;
public class AggregateFacadeImpl<A, C extends Command, E extends DomainEvent ,S> implements AggregateFacade<A, C, E, S> {
private final AggregateRepository<A, C, E, S> aggregateRepository;
private final Consumer<Message> publishChanged;
private final Consumer<S> publishDocument;
public AggregateFacadeImpl(
AggregateRepository<A, C, E, S> aggregateRepository,
Consumer<Message> publishChanged,
Consumer<S> publishDocument
) {
this.aggregateRepository = aggregateRepository;
this.publishChanged = publishChanged;
this.publishDocument = publishDocument;
}
@Override
public Mono<S> execute(C command, String aggregateID) {
return aggregateRepository.execute(command, new Id(aggregateID))
.flatMap(processResult(aggregateID));
}
@Override
public Mono<E> executeReturnEvent(C command, String aggregateID) {
var result = aggregateRepository.execute(command, new Id(aggregateID));
return result.map(r -> r.events().stream().findAny().get());
}
private Function<CommandResult<E>, Mono<S>> processResult(String aggregateId) {
return result -> {
switch (result.statusCode()) {
case OK:
return aggregateRepository.findById(new Id(aggregateId))
.doOnSuccess(publishDocument)
.doOnSuccess(s -> result.events().forEach(publishChanged));
case FAILED:
return Mono.error(result.error().orElseGet(() -> new RuntimeException("Generic error")));
case NOT_MODIFIED:
default:
return Mono.empty();
}
};
}
}

View File

@@ -21,21 +21,19 @@ import java.util.stream.Stream;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
public class AggregateRepositoryImpl<A, C extends Command, S> implements AggregateRepository<A, C, S> { public class AggregateRepositoryImpl<A, C extends Command, E extends DomainEvent,S> implements AggregateRepository<A, C, E, S> {
// private final Map<Id, List<DomainEvent>> eventSource = new HashMap<>(); private final AtomicReference<Map<Id, List<E>>> eventSource = new AtomicReference<>(new HashMap<>());
private final AtomicReference<Map<Id, List<DomainEvent>>> eventSource = new AtomicReference<>(new HashMap<>()); private final Cache<Id, AggregateActor<A, C, E>> cache = CacheBuilder.newBuilder()
private final Cache<Id, AggregateActor<A, C>> cache = CacheBuilder.newBuilder()
.expireAfterAccess(Duration.ofMinutes(10)) .expireAfterAccess(Duration.ofMinutes(10))
.removalListener((RemovalListener<Id, AggregateActor>) notification -> notification.getValue().onDestroy()) .removalListener((RemovalListener<Id, AggregateActor<A, C, E>>) notification -> notification.getValue().onDestroy())
.build(); .build();
private final CommandHandler<A, C> commandHandler; private final CommandHandler<A, C> commandHandler;
private final EventApplier<A, DomainEvent> eventApplier; private final EventApplier<A, E> eventApplier;
private final Function<Id, A> aggregateFactory; private final Function<Id, A> aggregateFactory;
@@ -43,7 +41,7 @@ public class AggregateRepositoryImpl<A, C extends Command, S> implements Aggrega
public AggregateRepositoryImpl( public AggregateRepositoryImpl(
CommandHandler<A, C> commandHandler, CommandHandler<A, C> commandHandler,
EventApplier<A, DomainEvent> eventApplier, EventApplier<A, E> eventApplier,
Function<Id, A> aggregateFactory, Function<Id, A> aggregateFactory,
Function<A, S> stateFactory Function<A, S> stateFactory
) { ) {
@@ -53,7 +51,7 @@ public class AggregateRepositoryImpl<A, C extends Command, S> implements Aggrega
this.stateFactory = stateFactory; this.stateFactory = stateFactory;
} }
private List<DomainEvent> persistAll(Id id, List<DomainEvent> events) { private List<E> persistAll(Id id, List<E> events) {
eventSource.updateAndGet(esMap -> { eventSource.updateAndGet(esMap -> {
var eventsToStore = Optional.ofNullable(esMap.get(id)) var eventsToStore = Optional.ofNullable(esMap.get(id))
.map(es -> Stream.concat(es.stream(), events.stream()) .map(es -> Stream.concat(es.stream(), events.stream())
@@ -66,12 +64,12 @@ public class AggregateRepositoryImpl<A, C extends Command, S> implements Aggrega
return events; return events;
} }
private Mono<AggregateActor<A, C>> getAggregate(Id id) { private Mono<AggregateActor<A, C, E>> getAggregate(Id id) {
return Mono.just(id) return Mono.just(id)
.map(this::getFromCache); .map(this::getFromCache);
} }
private AggregateActor<A, C> getFromCache(Id id) { private AggregateActor<A, C, E> getFromCache(Id id) {
try { try {
return cache.get(id, () -> return cache.get(id, () ->
new AggregateActorImpl<>( new AggregateActorImpl<>(
@@ -87,7 +85,7 @@ public class AggregateRepositoryImpl<A, C extends Command, S> implements Aggrega
} }
@Override @Override
public Mono<CommandResult> execute(C cmd, Id aggregateId) { public Mono<CommandResult<E>> execute(C cmd, Id aggregateId) {
return getAggregate(aggregateId) return getAggregate(aggregateId)
.flatMap(a -> a.execute(cmd)); .flatMap(a -> a.execute(cmd));
} }

View File

@@ -0,0 +1,19 @@
package com.mz.reactor.ddd.reactorddd.persistance.view.impl;
import com.mz.reactor.ddd.common.api.view.DomainView;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.function.Predicate;
public interface ViewRepository<S extends DomainView> {
void addView(S view);
Mono<S> findBy(Predicate<S> query);
Mono<List<S>> findAllListBy(Predicate<S> query);
Flux<S> findAllBy(Predicate<S> query);
}

View File

@@ -0,0 +1,57 @@
package com.mz.reactor.ddd.reactorddd.persistance.view.impl.impl;
import com.mz.reactor.ddd.common.api.view.DomainView;
import com.mz.reactor.ddd.reactorddd.persistance.view.impl.ViewRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Schedulers;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
public class ViewRepositoryImpl<S extends DomainView> implements ViewRepository<S> {
private final AtomicReference<Map<String, S>> storage = new AtomicReference<>(new HashMap<>());
UnicastProcessor<S> storageProcessor = UnicastProcessor.create();
FluxSink<S> sink = storageProcessor.sink();
public ViewRepositoryImpl() {
storageProcessor
.parallel()
.subscribe(view -> storage.updateAndGet(sm -> {
sm.put(view.id(), view);
return sm;
}));
}
@Override
public void addView(S view) {
sink.next(view);
}
@Override
public Mono<S> findBy(Predicate<S> query) {
return findAllBy(query)
.singleOrEmpty();
}
@Override
public Mono<List<S>> findAllListBy(Predicate<S> query) {
return findAllBy(query).collectList();
}
@Override
public Flux<S> findAllBy(Predicate<S> query) {
return Optional.ofNullable(query)
.map(Flux.fromStream(storage.get().values().stream())::filter)
.orElse(Flux.empty());
}
}

View File

@@ -22,7 +22,7 @@ class AggregateActorTest {
.withValue(10) .withValue(10)
.build(); .build();
var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand>( var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand, TestAggregateEvent>(
new Id(UUID.randomUUID().toString()), new Id(UUID.randomUUID().toString()),
TestFunctions.FN.commandHandler, TestFunctions.FN.commandHandler,
TestFunctions.FN.eventApplier, TestFunctions.FN.eventApplier,
@@ -31,7 +31,7 @@ class AggregateActorTest {
TestFunctions.FN.persistAll TestFunctions.FN.persistAll
); );
Mono<CommandResult> result = subject.execute(command); Mono<CommandResult<TestAggregateEvent>> result = subject.execute(command);
StepVerifier.create(result) StepVerifier.create(result)
.expectNextMatches(r -> .expectNextMatches(r ->
r.commandId().equals(commandId) && r.statusCode().equals(CommandResult.StatusCode.OK)) r.commandId().equals(commandId) && r.statusCode().equals(CommandResult.StatusCode.OK))
@@ -40,7 +40,7 @@ class AggregateActorTest {
@Test @Test
void onDestroy() { void onDestroy() {
var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand>( var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand, TestAggregateEvent>(
new Id(UUID.randomUUID().toString()), new Id(UUID.randomUUID().toString()),
TestFunctions.FN.commandHandler, TestFunctions.FN.commandHandler,
TestFunctions.FN.eventApplier, TestFunctions.FN.eventApplier,
@@ -60,7 +60,7 @@ class AggregateActorTest {
@Test @Test
void executeParallel() { void executeParallel() {
var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand>( var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand, TestAggregateEvent>(
new Id(UUID.randomUUID().toString()), new Id(UUID.randomUUID().toString()),
TestFunctions.FN.commandHandler, TestFunctions.FN.commandHandler,
TestFunctions.FN.eventApplier, TestFunctions.FN.eventApplier,
@@ -84,7 +84,7 @@ class AggregateActorTest {
@Test @Test
public void testRecoveryState() { public void testRecoveryState() {
var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand>( var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand, TestAggregateEvent>(
new Id(UUID.randomUUID().toString()), new Id(UUID.randomUUID().toString()),
TestFunctions.FN.commandHandler, TestFunctions.FN.commandHandler,
TestFunctions.FN.eventApplier, TestFunctions.FN.eventApplier,
@@ -105,7 +105,7 @@ class AggregateActorTest {
@Test @Test
public void test_RecoveryStateAndExecuteCommand() { public void test_RecoveryStateAndExecuteCommand() {
var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand>( var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand, TestAggregateEvent>(
new Id(UUID.randomUUID().toString()), new Id(UUID.randomUUID().toString()),
TestFunctions.FN.commandHandler, TestFunctions.FN.commandHandler,
TestFunctions.FN.eventApplier, TestFunctions.FN.eventApplier,

View File

@@ -18,7 +18,7 @@ class AggregateRepositoryImplTest {
private final Function<TestAggregate, Tuple2<Id, Long>> getState = a -> Tuples.of(a.getId(), a.getValue()); private final Function<TestAggregate, Tuple2<Id, Long>> getState = a -> Tuples.of(a.getId(), a.getValue());
AggregateRepositoryImpl<TestAggregate, TestAggregateCommand, Tuple2<Id, Long>> subject = AggregateRepositoryImpl<TestAggregate, TestAggregateCommand, TestAggregateEvent,Tuple2<Id, Long>> subject =
new AggregateRepositoryImpl<>( new AggregateRepositoryImpl<>(
TestFunctions.FN.commandHandler, TestFunctions.FN.commandHandler,
TestFunctions.FN.eventApplier, TestFunctions.FN.eventApplier,

View File

@@ -2,7 +2,6 @@ package com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl;
import com.mz.reactor.ddd.common.api.command.CommandHandler; import com.mz.reactor.ddd.common.api.command.CommandHandler;
import com.mz.reactor.ddd.common.api.command.CommandResult; import com.mz.reactor.ddd.common.api.command.CommandResult;
import com.mz.reactor.ddd.common.api.event.DomainEvent;
import com.mz.reactor.ddd.common.api.event.EventApplier; import com.mz.reactor.ddd.common.api.event.EventApplier;
import com.mz.reactor.ddd.common.api.valueobject.Id; import com.mz.reactor.ddd.common.api.valueobject.Id;
@@ -13,28 +12,31 @@ import java.util.function.Function;
public enum TestFunctions { public enum TestFunctions {
FN; FN;
public final CommandHandler<TestAggregate, TestAggregateCommand> commandHandler = (aggregate, command) -> { public final CommandHandler<TestAggregate, TestAggregateCommand> commandHandler = new CommandHandler<TestAggregate, TestAggregateCommand>() {
try { @Override
var event = aggregate.validate(command); public CommandResult<TestAggregateEvent> execute(TestAggregate aggregate, TestAggregateCommand command) {
return CommandResult.builder() try {
.commandId(command.commandId()) TestAggregateEvent event = aggregate.validate(command);
.statusCode(CommandResult.StatusCode.OK) return CommandResult.builder()
.addEvents(event) .commandId(command.commandId())
.build(); .statusCode(CommandResult.StatusCode.OK)
} catch (Exception e) { .addEvents(event)
return CommandResult.fromError( .build();
new RuntimeException(e), } catch (Exception e) {
null, return CommandResult.fromError(
command new RuntimeException(e),
); null,
command
);
}
} }
}; };
public final EventApplier<TestAggregate, DomainEvent> eventApplier = (aggregate, event) -> aggregate.apply((TestAggregateEvent) event); public final EventApplier<TestAggregate, TestAggregateEvent> eventApplier = (aggregate, event) -> aggregate.apply((TestAggregateEvent) event);
public final Function<Id, TestAggregate> aggregateFactory = TestAggregate::new; public final Function<Id, TestAggregate> aggregateFactory = TestAggregate::new;
public final BiFunction<Id, List<DomainEvent>, List<DomainEvent>> persistAll = (id, events) -> events; public final BiFunction<Id, List<TestAggregateEvent>, List<TestAggregateEvent>> persistAll = (id, events) -> events;
public Function<String, String> mapTest1(String val1, String val2) { public Function<String, String> mapTest1(String val1, String val2) {
return v -> val1 + " and " + val2 + " not " + v; return v -> val1 + " and " + val2 + " not " + v;

View File

@@ -0,0 +1,80 @@
package com.mz.reactor.ddd.reactorddd.persistance.model;
import com.mz.reactor.ddd.common.api.view.DomainView;
public class TestView implements DomainView {
private final String id;
private final String value;
private TestView(Builder builder) {
id = builder.id;
value = builder.value;
}
public String getId() {
return id;
}
public String getValue() {
return value;
}
@Override
public String id() {
return id;
}
public static Builder newBuilder() {
return new Builder();
}
public static Builder newBuilder(TestView copy) {
Builder builder = new Builder();
builder.id = copy.getId();
builder.value = copy.getValue();
return builder;
}
/**
* {@code TestView} builder static inner class.
*/
public static final class Builder {
private String id;
private String value;
private Builder() {
}
/**
* Sets the {@code id} and returns a reference to this Builder so that the methods can be chained together.
*
* @param val the {@code id} to set
* @return a reference to this Builder
*/
public Builder withId(String val) {
id = val;
return this;
}
/**
* Sets the {@code value} and returns a reference to this Builder so that the methods can be chained together.
*
* @param val the {@code value} to set
* @return a reference to this Builder
*/
public Builder withValue(String val) {
value = val;
return this;
}
/**
* Returns a {@code TestView} built from the parameters previously set.
*
* @return a {@code TestView} built with parameters of this {@code TestView.Builder}
*/
public TestView build() {
return new TestView(this);
}
}
}

View File

@@ -0,0 +1,74 @@
package com.mz.reactor.ddd.reactorddd.persistance.view.impl;
import com.mz.reactor.ddd.reactorddd.persistance.model.TestView;
import com.mz.reactor.ddd.reactorddd.persistance.view.impl.impl.ViewRepositoryImpl;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import java.util.UUID;
import java.util.function.Predicate;
class ViewRepositoryImplTest {
ViewRepository<TestView> subject;
private final String id1 = UUID.randomUUID().toString();
private final String value1 = UUID.randomUUID().toString();
private final String id2 = UUID.randomUUID().toString();
private final String value2 = UUID.randomUUID().toString();
@BeforeEach
public void before() {
subject = new ViewRepositoryImpl<>();
subject.addView(TestView.newBuilder().withId(id1).withValue(value1).build());
subject.addView(TestView.newBuilder().withId(id2).withValue(value2).build());
Flux.range(0, 1000000)
.publishOn(Schedulers.elastic())
.map(String::valueOf)
.doOnNext(v -> subject.addView(TestView.newBuilder().withId(v).withValue(v).build()))
.collectList().block();
}
@Test
void findAllBy_predictionIsForAll_isReturnedAll() {
var source = subject.findAllBy(v -> true);
StepVerifier.create(source)
.expectNextCount(1000002)
.verifyComplete();
}
@Test
void findAllBy_predictionForTwo_isReturnedTwo() {
Predicate<TestView> prediction1 = v -> id1.equals(v.getId());
Predicate<TestView> prediction2 = v -> id2.equals(v.getId());
var source = subject.findAllBy(prediction1.or(prediction2));
StepVerifier.create(source)
.expectNextCount(2)
.verifyComplete();
}
@Test
void findAllListBy_predictionForTwo_isReturnedTwo() {
Predicate<TestView> prediction1 = v -> id1.equals(v.getId());
Predicate<TestView> prediction2 = v -> id2.equals(v.getId());
var source = subject.findAllListBy(prediction1.or(prediction2));
StepVerifier.create(source)
.expectNextMatches(list -> list.size() == 2)
.verifyComplete();
}
@Test
void findBy_filteredById_isReturnedOne() {
Predicate<TestView> prediction1 = v -> id1.equals(v.getId());
var source = subject.findBy(prediction1);
StepVerifier.create(source)
.expectNextMatches(item -> item.getId().equals(id1))
.verifyComplete();
}
}

View File

@@ -4,7 +4,10 @@ include 'bank-account:bank-account-application'
include 'bank-account:account-domain' include 'bank-account:account-domain'
include 'bank-account:transaction-domain' include 'bank-account:transaction-domain'
include 'shared-dependencies' include 'shared-dependencies'
include 'common-persistance' include 'shared-spring-dependecies'
include 'common-persistence'
include 'common-components' include 'common-components'
include 'bank-account:account-domain-api' include 'bank-account:account-domain-api'
include 'bank-account:transaction-domain-api' include 'bank-account:transaction-domain-api'
include 'bank-account:account-api'
include 'bank-account:account-impl'