Update readme and code cleaning

This commit is contained in:
zemo
2020-11-15 18:38:09 +01:00
parent 98748f9aa3
commit 06fa8549dd
18 changed files with 198 additions and 121 deletions

View File

@@ -16,7 +16,7 @@ This demo is implemented using Spring reactor project.
* `Account` * `Account`
* `Transaction` * `Transaction`
Those two aggregates representing one bounded context `Bank account`. Those two aggregate roots representing two bounded contexts `Account bounded context` and `Transaction bounded context`.
## Supported operation by Account aggregate ## Supported operation by Account aggregate
`account-domain-api` module contains definition of contract provided by `Account` aggregate. `account-domain-api` module contains definition of contract provided by `Account` aggregate.
@@ -24,7 +24,6 @@ Here is the list of basic operations:
* create account * create account
* deposit money * deposit money
* withdraw money * withdraw money
* transfer money between two accounts
## Supported operation by Transaction aggregate ## Supported operation by Transaction aggregate
`transaction-domain-api` module contains definition of contract provided by `Transaction` aggregate `transaction-domain-api` module contains definition of contract provided by `Transaction` aggregate
@@ -32,6 +31,7 @@ Here is the list of basic operations:
* create transaction * create transaction
* cancel/rollback transaction * cancel/rollback transaction
* finish transaction * finish transaction
* transfer money between two accounts
## Persistence ## Persistence
Implementation is following CQRS and event sourcing design pattern. Persistence API is provided by Implementation is following CQRS and event sourcing design pattern. Persistence API is provided by

View File

@@ -7,10 +7,7 @@ import com.mz.reactor.ddd.reactorddd.account.domain.command.DepositMoney;
import com.mz.reactor.ddd.reactorddd.account.domain.command.DepositTransferMoney; import com.mz.reactor.ddd.reactorddd.account.domain.command.DepositTransferMoney;
import com.mz.reactor.ddd.reactorddd.account.domain.command.WithdrawMoney; import com.mz.reactor.ddd.reactorddd.account.domain.command.WithdrawMoney;
import com.mz.reactor.ddd.reactorddd.account.domain.command.WithdrawTransferMoney; import com.mz.reactor.ddd.reactorddd.account.domain.command.WithdrawTransferMoney;
import com.mz.reactor.ddd.reactorddd.account.domain.event.MoneyDeposited; import com.mz.reactor.ddd.reactorddd.account.domain.event.*;
import com.mz.reactor.ddd.reactorddd.account.domain.event.TransferMoneyAccountNotFound;
import com.mz.reactor.ddd.reactorddd.account.domain.event.TransferMoneyDeposited;
import com.mz.reactor.ddd.reactorddd.account.domain.event.TransferMoneyWithdrawn;
import com.mz.reactor.ddd.reactorddd.transaction.domain.event.TransactionCreated; import com.mz.reactor.ddd.reactorddd.transaction.domain.event.TransactionCreated;
import com.mz.reactor.ddd.reactorddd.transaction.domain.event.TransactionDepositRolledBack; import com.mz.reactor.ddd.reactorddd.transaction.domain.event.TransactionDepositRolledBack;
import com.mz.reactor.ddd.reactorddd.transaction.domain.event.TransactionWithdrawRolledBack; import com.mz.reactor.ddd.reactorddd.transaction.domain.event.TransactionWithdrawRolledBack;
@@ -32,7 +29,11 @@ public class TransactionChangeStreamAdapter {
private final AccountQuery accountQuery; private final AccountQuery accountQuery;
public TransactionChangeStreamAdapter(ApplicationMessageBus messageBus, AccountApplicationService accountService, AccountQuery accountQuery) { public TransactionChangeStreamAdapter(
ApplicationMessageBus messageBus,
AccountApplicationService accountService,
AccountQuery accountQuery
) {
this.messageBus = Objects.requireNonNull(messageBus); this.messageBus = Objects.requireNonNull(messageBus);
this.accountService = Objects.requireNonNull(accountService); this.accountService = Objects.requireNonNull(accountService);
this.accountQuery = Objects.requireNonNull(accountQuery); this.accountQuery = Objects.requireNonNull(accountQuery);
@@ -100,7 +101,7 @@ public class TransactionChangeStreamAdapter {
.amount(e.amount()) .amount(e.amount())
.aggregateId(e.toAccountId()) .aggregateId(e.toAccountId())
.correlationId(e.correlationId()) .correlationId(e.correlationId())
.build(), MoneyDeposited.class)) .build(), MoneyWithdrawn.class))
.log() .log()
.retry() .retry()
.subscribe(); .subscribe();

View File

@@ -8,7 +8,9 @@ import org.immutables.value.Value;
@JsonSerialize(as = ImmutableWithdrawTransferMoneyFailed.class) @JsonSerialize(as = ImmutableWithdrawTransferMoneyFailed.class)
@JsonDeserialize(as = ImmutableWithdrawTransferMoneyFailed.class) @JsonDeserialize(as = ImmutableWithdrawTransferMoneyFailed.class)
public interface WithdrawTransferMoneyFailed extends TransferMoneyFailed { public interface WithdrawTransferMoneyFailed extends TransferMoneyFailed {
static ImmutableWithdrawTransferMoneyFailed.Builder builder() { static ImmutableWithdrawTransferMoneyFailed.Builder builder() {
return ImmutableWithdrawTransferMoneyFailed.builder(); return ImmutableWithdrawTransferMoneyFailed.builder();
} }
} }

View File

@@ -11,13 +11,13 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class AccountAggregate { public class AccountAggregate {
private Id aggregateId; private final Id aggregateId;
private Money amount; private Money amount;
private Set<Id> openedTransactions = new HashSet<>(); private Set<Id> openedTransactions = new HashSet<>();
private Set<Id> finishedTransactions = new HashSet<>(); private final Set<Id> finishedTransactions = new HashSet<>();
public AccountAggregate(String aggregateId) { public AccountAggregate(String aggregateId) {
this.aggregateId = new Id(aggregateId); this.aggregateId = new Id(aggregateId);

View File

@@ -2,5 +2,6 @@ dependencies {
api project(':common-api') api project(':common-api')
api project(':common-components') api project(':common-components')
api project(':bank-account:account:account-domain-api') api project(':bank-account:account:account-domain-api')
implementation project(':common-persistence-api')
implementation project(':bank-account:account:account-api') implementation project(':bank-account:account:account-api')
} }

View File

@@ -48,7 +48,7 @@ public class AccountHandler implements HttpHandler {
public Mono<ServerResponse> getAll(ServerRequest request) { public Mono<ServerResponse> getAll(ServerRequest request) {
return ServerResponse.ok() return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON_UTF8) .contentType(MediaType.APPLICATION_JSON)
.body(accountQuery.getAll(), AccountState.class); .body(accountQuery.getAll(), AccountState.class);
} }
@@ -78,11 +78,11 @@ public class AccountHandler implements HttpHandler {
public RouterFunction<ServerResponse> route() { public RouterFunction<ServerResponse> route() {
var route = RouterFunctions var route = RouterFunctions
.route(POST("").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::createAccount) .route(POST("").and(accept(MediaType.APPLICATION_JSON)), this::createAccount)
.andRoute(GET("/").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::getAll) .andRoute(GET("/").and(accept(MediaType.APPLICATION_JSON)), this::getAll)
.andRoute(GET("/{id}").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::getById) .andRoute(GET("/{id}").and(accept(MediaType.APPLICATION_JSON)), this::getById)
.andRoute(PUT("/moneys/withdraw").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::withdrawMoney) .andRoute(PUT("/moneys/withdraw").and(accept(MediaType.APPLICATION_JSON)), this::withdrawMoney)
.andRoute(PUT("/moneys/deposit").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::depositMoney); .andRoute(PUT("/moneys/deposit").and(accept(MediaType.APPLICATION_JSON)), this::depositMoney);
return RouterFunctions.route() return RouterFunctions.route()
.nest(path("/accounts"), () -> route) .nest(path("/accounts"), () -> route)

View File

@@ -31,6 +31,7 @@ public class AccountApplicationServiceImpl implements AccountApplicationService
this.aggregateFacade = aggregateFacade; this.aggregateFacade = aggregateFacade;
} }
@Override
public <R extends DomainEvent> Mono<R> execute(AccountCommand cmd, Class<R> eventType) { public <R extends DomainEvent> Mono<R> execute(AccountCommand cmd, Class<R> eventType) {
return this.aggregateFacade.executeReturnEvent(cmd, cmd.aggregateId(), eventType) return this.aggregateFacade.executeReturnEvent(cmd, cmd.aggregateId(), eventType)
.cast(eventType); .cast(eventType);

View File

@@ -11,6 +11,7 @@ import com.mz.reactor.ddd.reactorddd.persistance.aggregate.AggregateFacade;
import com.mz.reactor.ddd.reactorddd.persistance.aggregate.AggregateRepository; import com.mz.reactor.ddd.reactorddd.persistance.aggregate.AggregateRepository;
import com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl.AggregateFacadeImpl; import com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl.AggregateFacadeImpl;
import com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl.AggregateRepositoryImpl; import com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl.AggregateRepositoryImpl;
import com.mz.reactor.ddd.reactorddd.persistance.query.Query;
import com.mz.reactor.ddd.reactorddd.persistance.view.impl.ViewRepository; import com.mz.reactor.ddd.reactorddd.persistance.view.impl.ViewRepository;
import com.mz.reactor.ddd.reactorddd.persistance.view.impl.impl.ViewRepositoryImpl; import com.mz.reactor.ddd.reactorddd.persistance.view.impl.impl.ViewRepositoryImpl;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
@@ -25,12 +26,26 @@ public class AccountConfiguration {
public static final String ACCOUNT_AGGREGATE_REPOSITORY = "accountAggregateRepository"; public static final String ACCOUNT_AGGREGATE_REPOSITORY = "accountAggregateRepository";
public static final String ACCOUNT_AGGREGATE_FACADE = "accountAggregateFacade"; public static final String ACCOUNT_AGGREGATE_FACADE = "accountAggregateFacade";
public static final String ACCOUNT_VIEW_REPOSITORY = "accountViewRepository"; public static final String ACCOUNT_VIEW_REPOSITORY = "accountViewRepository";
public static final String ACCOUNT_QUERY_SERVICE = "accountQueryService";
private final AccountEventHandler accountEventApplier = new AccountEventHandler(); private final AccountEventHandler accountEventApplier = new AccountEventHandler();
private final AccountCommandHandler accountCommandHandler = new AccountCommandHandler(); private final AccountCommandHandler accountCommandHandler = new AccountCommandHandler();
private final Function<Id, AccountAggregate> aggregateFactory = id -> new AccountAggregate(id.getValue()); private final Function<Id, AccountAggregate> aggregateFactory = id -> new AccountAggregate(id.getValue());
private final Function<AccountAggregate, AccountState> stateFactory = AccountAggregate::getState; private final Function<AccountAggregate, AccountState> stateFactory = AccountAggregate::getState;
@Bean(ACCOUNT_QUERY_SERVICE)
public Query<AccountState> accountQueryService(
@Qualifier(ACCOUNT_VIEW_REPOSITORY) ViewRepository<AccountState> viewRepository,
ApplicationMessageBus bus
) {
return Query.of(
viewRepository,
() -> bus.messagesStream()
.filter(m -> m instanceof AccountState)
.cast(AccountState.class)
);
}
@Bean(ACCOUNT_AGGREGATE_REPOSITORY) @Bean(ACCOUNT_AGGREGATE_REPOSITORY)
public AggregateRepository<AccountAggregate, AccountCommand, AccountState> getAggregateRepository() { public AggregateRepository<AccountAggregate, AccountCommand, AccountState> getAggregateRepository() {
return new AggregateRepositoryImpl<>(accountCommandHandler, accountEventApplier, aggregateFactory, stateFactory); return new AggregateRepositoryImpl<>(accountCommandHandler, accountEventApplier, aggregateFactory, stateFactory);

View File

@@ -40,7 +40,7 @@ public class BankAccountAppConfiguration {
.add(accountHandler.route()) .add(accountHandler.route())
.add(transactionHandler.route()) .add(transactionHandler.route())
.GET("/health", req -> ServerResponse.ok() .GET("/health", req -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON_UTF8) .contentType(MediaType.APPLICATION_JSON)
.body(Mono.just("Tick"), String.class)) .body(Mono.just("Tick"), String.class))
.onError(Throwable.class, .onError(Throwable.class,
(throwable, serverRequest) -> HttpHandlers.onError(throwable, serverRequest, error -> log.error("Error: ", error))) (throwable, serverRequest) -> HttpHandlers.onError(throwable, serverRequest, error -> log.error("Error: ", error)))

View File

@@ -61,21 +61,21 @@ public class BankAccountAppTest {
.build()) .build())
.build(); .build();
return webTestClient.post().uri("/accounts") return webTestClient.post().uri("/accounts")
.contentType(MediaType.APPLICATION_JSON_UTF8) .contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromObject(createAccount1)) .body(BodyInserters.fromObject(createAccount1))
.exchange() .exchange()
.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) .expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBody(CreateAccountResponse.class) .expectBody(CreateAccountResponse.class)
.returnResult().getResponseBody(); .returnResult().getResponseBody();
} }
private List<AccountState> getAllAccounts() { private List<AccountState> getAllAccounts() {
return webTestClient.get().uri("/accounts") return webTestClient.get().uri("/accounts")
.accept(MediaType.APPLICATION_JSON_UTF8) .accept(MediaType.APPLICATION_JSON)
.exchange() .exchange()
.expectStatus() .expectStatus()
.isOk() .isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) .expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBody(List.class).returnResult().getResponseBody(); .expectBody(List.class).returnResult().getResponseBody();
} }
@@ -90,39 +90,39 @@ public class BankAccountAppTest {
.build(); .build();
return webTestClient.post().uri("/transactions") return webTestClient.post().uri("/transactions")
.contentType(MediaType.APPLICATION_JSON_UTF8) .contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromObject(createTransactionRequest)) .body(BodyInserters.fromObject(createTransactionRequest))
.exchange() .exchange()
.expectStatus().isOk() .expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) .expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBody(CreateTransactionResponse.class).returnResult().getResponseBody(); .expectBody(CreateTransactionResponse.class).returnResult().getResponseBody();
} }
private AccountState getAccount(String id) { private AccountState getAccount(String id) {
return webTestClient.get().uri("/accounts/{id}", id) return webTestClient.get().uri("/accounts/{id}", id)
.accept(MediaType.APPLICATION_JSON_UTF8) .accept(MediaType.APPLICATION_JSON)
.exchange() .exchange()
.expectStatus().isOk() .expectStatus().isOk()
// .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) // .expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBody(AccountState.class).returnResult().getResponseBody(); .expectBody(AccountState.class).returnResult().getResponseBody();
} }
private String getAccountString(String id) { private String getAccountString(String id) {
return webTestClient.get().uri("/accounts/{id}", id) return webTestClient.get().uri("/accounts/{id}", id)
.accept(MediaType.APPLICATION_JSON_UTF8) .accept(MediaType.APPLICATION_JSON)
.exchange() .exchange()
.expectStatus().isOk() .expectStatus().isOk()
// .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) // .expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBody(String.class).returnResult().getResponseBody(); .expectBody(String.class).returnResult().getResponseBody();
} }
private TransactionState getTransaction(String id) { private TransactionState getTransaction(String id) {
return webTestClient.get().uri("/transactions/{id}", id) return webTestClient.get().uri("/transactions/{id}", id)
.accept(MediaType.APPLICATION_JSON_UTF8) .accept(MediaType.APPLICATION_JSON)
.exchange() .exchange()
.expectStatus() .expectStatus()
.isOk() .isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) .expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBody(TransactionState.class).returnResult().getResponseBody(); .expectBody(TransactionState.class).returnResult().getResponseBody();
} }
} }

View File

@@ -59,7 +59,7 @@ public class AccountChangeStreamAdapter {
} }
private void handleTransferMoneyWithdrawn(ApplicationMessageBus bus) { private void handleTransferMoneyWithdrawn(ApplicationMessageBus bus) {
log.debug("handleTransferMoneyDeposited ->"); log.debug("handleTransferMoneyWithdrawn ->");
bus.messagesStream() bus.messagesStream()
.filter(m -> m instanceof TransferMoneyWithdrawn) .filter(m -> m instanceof TransferMoneyWithdrawn)
.cast(TransferMoneyWithdrawn.class) .cast(TransferMoneyWithdrawn.class)

View File

@@ -26,7 +26,11 @@ public class TransactionHandler implements HttpHandler {
private final ObjectMapper mapper; private final ObjectMapper mapper;
public TransactionHandler(TransactionApplicationService transactionApplicationService, TransactionQuery query, ObjectMapper mapper) { public TransactionHandler(
TransactionApplicationService transactionApplicationService,
TransactionQuery query,
ObjectMapper mapper
) {
this.service = Objects.requireNonNull(transactionApplicationService); this.service = Objects.requireNonNull(transactionApplicationService);
this.query = Objects.requireNonNull(query); this.query = Objects.requireNonNull(query);
this.mapper = mapper; this.mapper = mapper;
@@ -34,7 +38,7 @@ public class TransactionHandler implements HttpHandler {
public Mono<ServerResponse> getAll(ServerRequest request) { public Mono<ServerResponse> getAll(ServerRequest request) {
return ServerResponse.ok() return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON_UTF8) .contentType(MediaType.APPLICATION_JSON)
.body(query.getAll(), TransactionState.class); .body(query.getAll(), TransactionState.class);
} }
@@ -54,9 +58,9 @@ public class TransactionHandler implements HttpHandler {
@Override @Override
public RouterFunction<ServerResponse> route() { public RouterFunction<ServerResponse> route() {
var route = RouterFunctions var route = RouterFunctions
.route(POST("").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::createTransaction) .route(POST("").and(accept(MediaType.APPLICATION_JSON)), this::createTransaction)
.andRoute(GET("/").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::getAll) .andRoute(GET("/").and(accept(MediaType.APPLICATION_JSON)), this::getAll)
.andRoute(GET("/{id}").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::getById); .andRoute(GET("/{id}").and(accept(MediaType.APPLICATION_JSON)), this::getById);
return RouterFunctions.route() return RouterFunctions.route()
.nest(path("/transactions"), () -> route) .nest(path("/transactions"), () -> route)

View File

@@ -9,7 +9,7 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import static com.mz.reactor.ddd.common.components.http.HttpHandlers.deserializeJsonString; import static com.mz.reactor.ddd.common.components.http.HttpHandlers.deserializeJsonString;
import static org.springframework.web.reactive.function.BodyInserters.fromObject; import static org.springframework.web.reactive.function.BodyInserters.fromValue;
public interface HttpHandler { public interface HttpHandler {
@@ -29,8 +29,8 @@ public interface HttpHandler {
default <T> Mono<ServerResponse> mapToResponse(T result) { default <T> Mono<ServerResponse> mapToResponse(T result) {
return ServerResponse.ok() return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON_UTF8) .contentType(MediaType.APPLICATION_JSON)
.body(fromObject(result)); .body(fromValue(result));
} }
} }

View File

@@ -13,18 +13,11 @@ import javax.annotation.Nonnull;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import static org.springframework.web.reactive.function.BodyInserters.fromObject; import static org.springframework.web.reactive.function.BodyInserters.fromValue;
public final class HttpHandlers { public final class HttpHandlers {
private HttpHandlers() {} private HttpHandlers() {}
// private final ObjectMapper mapper;
//
// HttpHandlerFunctions() {
// mapper = new ObjectMapper();
// mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
// mapper.registerModule(new Jdk8Module());
// }
public static <T> Function<String, Mono<T>> deserializeJsonString( public static <T> Function<String, Mono<T>> deserializeJsonString(
@Nonnull Class<T> clazz, @Nonnull Class<T> clazz,
@@ -43,7 +36,7 @@ public final class HttpHandlers {
logger.accept(e); logger.accept(e);
return ErrorMessage.builder().error(e.getMessage()).build(); return ErrorMessage.builder().error(e.getMessage()).build();
}).flatMap(error -> ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR) }).flatMap(error -> ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.contentType(MediaType.APPLICATION_JSON_UTF8) .contentType(MediaType.APPLICATION_JSON)
.body(fromObject(error))); .body(fromValue(error)));
} }
} }

View File

@@ -44,7 +44,7 @@ public class AggregateFacadeImpl<A, C extends Command, S> implements AggregateFa
public Mono<? extends DomainEvent> executeReturnEvent(C command, String aggregateID, Class<? extends DomainEvent> eventType) { public Mono<? extends DomainEvent> executeReturnEvent(C command, String aggregateID, Class<? extends DomainEvent> eventType) {
var result = aggregateRepository.execute(command, new Id(aggregateID)); var result = aggregateRepository.execute(command, new Id(aggregateID));
return result.flatMap(cr -> processResult(aggregateID, eventType, cr)) return result.flatMap(cr -> processResult(aggregateID, eventType, cr))
.doOnError(error -> log.error("execute -> event type: "+eventType, error)); .doOnError(error -> log.error("execute -> event type: " + eventType, error));
} }
@Override @Override

View File

@@ -23,87 +23,88 @@ import static java.util.stream.Collectors.toList;
public class AggregateRepositoryImpl<A, C extends Command, S> implements AggregateRepository<A, C, S> { public class AggregateRepositoryImpl<A, C extends Command, S> implements AggregateRepository<A, C, S> {
private final AtomicReference<Map<Id, List<? extends DomainEvent>>> eventSource = new AtomicReference<>(new HashMap<>()); private final AtomicReference<Map<Id, List<? extends DomainEvent>>> eventSource = new AtomicReference<>(new HashMap<>());
private final Cache<Id, AggregateActor<A, C>> cache = CacheBuilder.newBuilder() private final Cache<Id, AggregateActor<A, C>> cache = CacheBuilder.newBuilder()
.expireAfterAccess(Duration.ofMinutes(10)) .expireAfterAccess(Duration.ofMinutes(10))
.removalListener((RemovalListener<Id, AggregateActor<A, C>>) notification -> notification.getValue().onDestroy()) .removalListener((RemovalListener<Id, AggregateActor<A, C>>) notification -> notification.getValue().onDestroy())
.build(); .build();
private final CommandHandler<A, C> commandHandler; private final CommandHandler<A, C> commandHandler;
private final EventHandler<A> eventHandler; private final EventHandler<A> eventHandler;
private final Function<Id, A> aggregateFactory; private final Function<Id, A> aggregateFactory;
private final Function<A, S> stateFactory; private final Function<A, S> stateFactory;
public AggregateRepositoryImpl( public AggregateRepositoryImpl(
CommandHandler<A, C> commandHandler, CommandHandler<A, C> commandHandler,
EventHandler<A> eventHandler, EventHandler<A> eventHandler,
Function<Id, A> aggregateFactory, Function<Id, A> aggregateFactory,
Function<A, S> stateFactory Function<A, S> stateFactory
) { ) {
this.commandHandler = commandHandler; this.commandHandler = commandHandler;
this.eventHandler = eventHandler; this.eventHandler = eventHandler;
this.aggregateFactory = aggregateFactory; this.aggregateFactory = aggregateFactory;
this.stateFactory = stateFactory; this.stateFactory = stateFactory;
}
private List<? extends DomainEvent> persistAll(Id id, List<? extends DomainEvent> events) {
eventSource.updateAndGet(esMap -> {
var eventsToStore = Optional.ofNullable(esMap.get(id))
.map(es -> Stream.concat(es.stream(), events.stream())
.sorted(Comparator.comparing(DomainEvent::createdAt))
.collect(toList()))
.orElse((List<DomainEvent>) events);
esMap.put(id, eventsToStore);
return esMap;
});
return events;
}
private Mono<AggregateActor<A, C>> getAggregate(Id id) {
return Mono.just(id)
.map(this::getFromCache);
}
private AggregateActor<A, C> getFromCache(Id id) {
try {
return cache.get(id, () ->
new AggregateActorImpl<>(
id,
commandHandler,
eventHandler,
aggregateFactory,
Optional.ofNullable(eventSource.get().get(id)).orElseGet(List::of),
this::persistAll
)
);
} catch (Exception e) {
throw new RuntimeException(e);
} }
}
private List<? extends DomainEvent> persistAll(Id id, List<? extends DomainEvent> events) { @Override
eventSource.updateAndGet(esMap -> { public Mono<CommandResult> execute(C cmd, Id aggregateId) {
var eventsToStore = Optional.ofNullable(esMap.get(id)) return getAggregate(aggregateId)
.map(es -> Stream.concat(es.stream(), events.stream()) .flatMap(a -> a.execute(cmd));
.sorted(Comparator.comparing(DomainEvent::createdAt)) }
.collect(toList()))
.orElse((List<DomainEvent>) events);
esMap.put(id, eventsToStore);
return esMap;
});
return events;
}
private Mono<AggregateActor<A, C>> getAggregate(Id id) { @Override
return Mono.just(id) public Mono<S> findById(Id id) {
.map(this::getFromCache); return getAggregate(id).flatMap(a -> a.getState(this.stateFactory));
} }
private AggregateActor<A, C> getFromCache(Id id) { @Override
try { public Mono<S> findIfExists(Id id) {
return cache.get(id, () -> return Mono.just(id)
new AggregateActorImpl<A, C>( .flatMap(
id, i -> Optional.ofNullable(cache.getIfPresent(i))
commandHandler, .map(a -> a.getState(this.stateFactory))
eventHandler, .orElseGet(Mono::empty)
aggregateFactory, );
Optional.ofNullable(eventSource.get().get(id)).orElseGet(List::of), }
this::persistAll
)
);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public Mono<CommandResult> execute(C cmd, Id aggregateId) {
return getAggregate(aggregateId)
.flatMap(a -> a.execute(cmd));
}
@Override
public Mono<S> findById(Id id) {
return getAggregate(id).flatMap(a -> a.getState(this.stateFactory));
}
@Override
public Mono<S> findIfExists(Id id) {
return Mono.just(id)
.flatMap(i -> Optional.ofNullable(cache.getIfPresent(i))
.map(a -> a.getState(this.stateFactory))
.orElseGet(Mono::empty)
);
}
} }

View File

@@ -0,0 +1,20 @@
package com.mz.reactor.ddd.reactorddd.persistance.query;
import com.mz.reactor.ddd.common.api.view.DomainView;
import com.mz.reactor.ddd.reactorddd.persistance.query.impl.QueryImpl;
import com.mz.reactor.ddd.reactorddd.persistance.view.impl.ViewRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Supplier;
public interface Query<S extends DomainView> {
Mono<S> findById(String id);
Flux<S> getAll();
static <S extends DomainView> Query<S> of(ViewRepository<S> viewRepository, Supplier<Flux<S>> documentStream) {
return new QueryImpl<>(viewRepository, documentStream);
}
}

View File

@@ -0,0 +1,39 @@
package com.mz.reactor.ddd.reactorddd.persistance.query.impl;
import com.mz.reactor.ddd.common.api.view.DomainView;
import com.mz.reactor.ddd.reactorddd.persistance.query.Query;
import com.mz.reactor.ddd.reactorddd.persistance.view.impl.ViewRepository;
import org.springframework.lang.NonNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
public class QueryImpl<S extends DomainView> implements Query<S> {
private final ViewRepository<S> repository;
private final Predicate<S> getAll = v -> true;
public QueryImpl(
ViewRepository<S> repository,
Supplier<Flux<S>> documentStream
) {
this.repository = requireNonNull(repository, "repository is required");
requireNonNull(documentStream, "documentStream is required").get()
.subscribe(repository::addView);
}
@Override
public Mono<S> findById(@NonNull String id) {
return repository.findBy(view -> view.id().equals(id));
}
@Override
public Flux<S> getAll() {
return repository.findAllBy(getAll);
}
}