From 06fa8549dd6561c0e92c222a170171653ab5d7ae Mon Sep 17 00:00:00 2001 From: zemo Date: Sun, 15 Nov 2020 18:38:09 +0100 Subject: [PATCH] Update readme and code cleaning --- README.md | 6 +- .../TransactionChangeStreamAdapter.java | 13 +- .../event/WithdrawTransferMoneyFailed.java | 2 + .../account/domain/AccountAggregate.java | 4 +- .../account/account-http-api/build.gradle | 1 + .../account/http/AccountHandler.java | 12 +- .../impl/AccountApplicationServiceImpl.java | 1 + .../account/wiring/AccountConfiguration.java | 15 ++ .../BankAccountAppConfiguration.java | 2 +- .../application/BankAccountAppTest.java | 24 +-- .../account/AccountChangeStreamAdapter.java | 2 +- .../transaction/api/TransactionHandler.java | 14 +- .../common/components/http/HttpHandler.java | 6 +- .../common/components/http/HttpHandlers.java | 13 +- .../aggregate/impl/AggregateFacadeImpl.java | 2 +- .../impl/AggregateRepositoryImpl.java | 143 +++++++++--------- .../reactorddd/persistance/query/Query.java | 20 +++ .../persistance/query/impl/QueryImpl.java | 39 +++++ 18 files changed, 198 insertions(+), 121 deletions(-) create mode 100644 common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/query/Query.java create mode 100644 common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/query/impl/QueryImpl.java diff --git a/README.md b/README.md index b790ca1..2dc681e 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ This demo is implemented using Spring reactor project. * `Account` * `Transaction` -Those two aggregates representing one bounded context `Bank account`. +Those two aggregate roots representing two bounded contexts `Account bounded context` and `Transaction bounded context`. ## Supported operation by Account aggregate `account-domain-api` module contains definition of contract provided by `Account` aggregate. @@ -24,14 +24,14 @@ Here is the list of basic operations: * create account * deposit money * withdraw money -* transfer money between two accounts ## Supported operation by Transaction aggregate `transaction-domain-api` module contains definition of contract provided by `Transaction` aggregate Here is the list of basic operations: * create transaction * cancel/rollback transaction -* finish transaction +* finish transaction +* transfer money between two accounts ## Persistence Implementation is following CQRS and event sourcing design pattern. Persistence API is provided by diff --git a/bank-account/account/account-adapters/src/main/java/com/mz/reactor/ddd/reactorddd/transaction/adapters/transaction/TransactionChangeStreamAdapter.java b/bank-account/account/account-adapters/src/main/java/com/mz/reactor/ddd/reactorddd/transaction/adapters/transaction/TransactionChangeStreamAdapter.java index 72fb7b4..bc0736b 100755 --- a/bank-account/account/account-adapters/src/main/java/com/mz/reactor/ddd/reactorddd/transaction/adapters/transaction/TransactionChangeStreamAdapter.java +++ b/bank-account/account/account-adapters/src/main/java/com/mz/reactor/ddd/reactorddd/transaction/adapters/transaction/TransactionChangeStreamAdapter.java @@ -7,10 +7,7 @@ import com.mz.reactor.ddd.reactorddd.account.domain.command.DepositMoney; import com.mz.reactor.ddd.reactorddd.account.domain.command.DepositTransferMoney; import com.mz.reactor.ddd.reactorddd.account.domain.command.WithdrawMoney; import com.mz.reactor.ddd.reactorddd.account.domain.command.WithdrawTransferMoney; -import com.mz.reactor.ddd.reactorddd.account.domain.event.MoneyDeposited; -import com.mz.reactor.ddd.reactorddd.account.domain.event.TransferMoneyAccountNotFound; -import com.mz.reactor.ddd.reactorddd.account.domain.event.TransferMoneyDeposited; -import com.mz.reactor.ddd.reactorddd.account.domain.event.TransferMoneyWithdrawn; +import com.mz.reactor.ddd.reactorddd.account.domain.event.*; import com.mz.reactor.ddd.reactorddd.transaction.domain.event.TransactionCreated; import com.mz.reactor.ddd.reactorddd.transaction.domain.event.TransactionDepositRolledBack; import com.mz.reactor.ddd.reactorddd.transaction.domain.event.TransactionWithdrawRolledBack; @@ -32,7 +29,11 @@ public class TransactionChangeStreamAdapter { private final AccountQuery accountQuery; - public TransactionChangeStreamAdapter(ApplicationMessageBus messageBus, AccountApplicationService accountService, AccountQuery accountQuery) { + public TransactionChangeStreamAdapter( + ApplicationMessageBus messageBus, + AccountApplicationService accountService, + AccountQuery accountQuery + ) { this.messageBus = Objects.requireNonNull(messageBus); this.accountService = Objects.requireNonNull(accountService); this.accountQuery = Objects.requireNonNull(accountQuery); @@ -100,7 +101,7 @@ public class TransactionChangeStreamAdapter { .amount(e.amount()) .aggregateId(e.toAccountId()) .correlationId(e.correlationId()) - .build(), MoneyDeposited.class)) + .build(), MoneyWithdrawn.class)) .log() .retry() .subscribe(); diff --git a/bank-account/account/account-domain-api/src/main/java/com/mz/reactor/ddd/reactorddd/account/domain/event/WithdrawTransferMoneyFailed.java b/bank-account/account/account-domain-api/src/main/java/com/mz/reactor/ddd/reactorddd/account/domain/event/WithdrawTransferMoneyFailed.java index c9d7861..7b63d83 100755 --- a/bank-account/account/account-domain-api/src/main/java/com/mz/reactor/ddd/reactorddd/account/domain/event/WithdrawTransferMoneyFailed.java +++ b/bank-account/account/account-domain-api/src/main/java/com/mz/reactor/ddd/reactorddd/account/domain/event/WithdrawTransferMoneyFailed.java @@ -8,7 +8,9 @@ import org.immutables.value.Value; @JsonSerialize(as = ImmutableWithdrawTransferMoneyFailed.class) @JsonDeserialize(as = ImmutableWithdrawTransferMoneyFailed.class) public interface WithdrawTransferMoneyFailed extends TransferMoneyFailed { + static ImmutableWithdrawTransferMoneyFailed.Builder builder() { return ImmutableWithdrawTransferMoneyFailed.builder(); } + } diff --git a/bank-account/account/account-domain/src/main/java/com/mz/reactor/ddd/reactorddd/account/domain/AccountAggregate.java b/bank-account/account/account-domain/src/main/java/com/mz/reactor/ddd/reactorddd/account/domain/AccountAggregate.java index f4252fc..4a764ef 100755 --- a/bank-account/account/account-domain/src/main/java/com/mz/reactor/ddd/reactorddd/account/domain/AccountAggregate.java +++ b/bank-account/account/account-domain/src/main/java/com/mz/reactor/ddd/reactorddd/account/domain/AccountAggregate.java @@ -11,13 +11,13 @@ import java.util.Set; import java.util.stream.Collectors; public class AccountAggregate { - private Id aggregateId; + private final Id aggregateId; private Money amount; private Set openedTransactions = new HashSet<>(); - private Set finishedTransactions = new HashSet<>(); + private final Set finishedTransactions = new HashSet<>(); public AccountAggregate(String aggregateId) { this.aggregateId = new Id(aggregateId); diff --git a/bank-account/account/account-http-api/build.gradle b/bank-account/account/account-http-api/build.gradle index 9a2606c..131be5b 100755 --- a/bank-account/account/account-http-api/build.gradle +++ b/bank-account/account/account-http-api/build.gradle @@ -2,5 +2,6 @@ dependencies { api project(':common-api') api project(':common-components') api project(':bank-account:account:account-domain-api') + implementation project(':common-persistence-api') implementation project(':bank-account:account:account-api') } \ No newline at end of file diff --git a/bank-account/account/account-http-api/src/main/java/com/mz/reactor/ddd/reactorddd/account/http/AccountHandler.java b/bank-account/account/account-http-api/src/main/java/com/mz/reactor/ddd/reactorddd/account/http/AccountHandler.java index 552d2af..4a6ac2b 100755 --- a/bank-account/account/account-http-api/src/main/java/com/mz/reactor/ddd/reactorddd/account/http/AccountHandler.java +++ b/bank-account/account/account-http-api/src/main/java/com/mz/reactor/ddd/reactorddd/account/http/AccountHandler.java @@ -48,7 +48,7 @@ public class AccountHandler implements HttpHandler { public Mono getAll(ServerRequest request) { return ServerResponse.ok() - .contentType(MediaType.APPLICATION_JSON_UTF8) + .contentType(MediaType.APPLICATION_JSON) .body(accountQuery.getAll(), AccountState.class); } @@ -78,11 +78,11 @@ public class AccountHandler implements HttpHandler { public RouterFunction route() { var route = RouterFunctions - .route(POST("").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::createAccount) - .andRoute(GET("/").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::getAll) - .andRoute(GET("/{id}").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::getById) - .andRoute(PUT("/moneys/withdraw").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::withdrawMoney) - .andRoute(PUT("/moneys/deposit").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::depositMoney); + .route(POST("").and(accept(MediaType.APPLICATION_JSON)), this::createAccount) + .andRoute(GET("/").and(accept(MediaType.APPLICATION_JSON)), this::getAll) + .andRoute(GET("/{id}").and(accept(MediaType.APPLICATION_JSON)), this::getById) + .andRoute(PUT("/moneys/withdraw").and(accept(MediaType.APPLICATION_JSON)), this::withdrawMoney) + .andRoute(PUT("/moneys/deposit").and(accept(MediaType.APPLICATION_JSON)), this::depositMoney); return RouterFunctions.route() .nest(path("/accounts"), () -> route) diff --git a/bank-account/account/account-impl/src/main/java/com/mz/reactor/ddd/reactorddd/account/impl/AccountApplicationServiceImpl.java b/bank-account/account/account-impl/src/main/java/com/mz/reactor/ddd/reactorddd/account/impl/AccountApplicationServiceImpl.java index e7452ef..97623d8 100755 --- a/bank-account/account/account-impl/src/main/java/com/mz/reactor/ddd/reactorddd/account/impl/AccountApplicationServiceImpl.java +++ b/bank-account/account/account-impl/src/main/java/com/mz/reactor/ddd/reactorddd/account/impl/AccountApplicationServiceImpl.java @@ -31,6 +31,7 @@ public class AccountApplicationServiceImpl implements AccountApplicationService this.aggregateFacade = aggregateFacade; } + @Override public Mono execute(AccountCommand cmd, Class eventType) { return this.aggregateFacade.executeReturnEvent(cmd, cmd.aggregateId(), eventType) .cast(eventType); diff --git a/bank-account/account/account-impl/src/main/java/com/mz/reactor/ddd/reactorddd/account/wiring/AccountConfiguration.java b/bank-account/account/account-impl/src/main/java/com/mz/reactor/ddd/reactorddd/account/wiring/AccountConfiguration.java index b2393f6..c9b7ccd 100755 --- a/bank-account/account/account-impl/src/main/java/com/mz/reactor/ddd/reactorddd/account/wiring/AccountConfiguration.java +++ b/bank-account/account/account-impl/src/main/java/com/mz/reactor/ddd/reactorddd/account/wiring/AccountConfiguration.java @@ -11,6 +11,7 @@ import com.mz.reactor.ddd.reactorddd.persistance.aggregate.AggregateFacade; import com.mz.reactor.ddd.reactorddd.persistance.aggregate.AggregateRepository; import com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl.AggregateFacadeImpl; import com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl.AggregateRepositoryImpl; +import com.mz.reactor.ddd.reactorddd.persistance.query.Query; import com.mz.reactor.ddd.reactorddd.persistance.view.impl.ViewRepository; import com.mz.reactor.ddd.reactorddd.persistance.view.impl.impl.ViewRepositoryImpl; import org.springframework.beans.factory.annotation.Qualifier; @@ -25,12 +26,26 @@ public class AccountConfiguration { public static final String ACCOUNT_AGGREGATE_REPOSITORY = "accountAggregateRepository"; public static final String ACCOUNT_AGGREGATE_FACADE = "accountAggregateFacade"; public static final String ACCOUNT_VIEW_REPOSITORY = "accountViewRepository"; + public static final String ACCOUNT_QUERY_SERVICE = "accountQueryService"; private final AccountEventHandler accountEventApplier = new AccountEventHandler(); private final AccountCommandHandler accountCommandHandler = new AccountCommandHandler(); private final Function aggregateFactory = id -> new AccountAggregate(id.getValue()); private final Function stateFactory = AccountAggregate::getState; + @Bean(ACCOUNT_QUERY_SERVICE) + public Query accountQueryService( + @Qualifier(ACCOUNT_VIEW_REPOSITORY) ViewRepository viewRepository, + ApplicationMessageBus bus + ) { + return Query.of( + viewRepository, + () -> bus.messagesStream() + .filter(m -> m instanceof AccountState) + .cast(AccountState.class) + ); + } + @Bean(ACCOUNT_AGGREGATE_REPOSITORY) public AggregateRepository getAggregateRepository() { return new AggregateRepositoryImpl<>(accountCommandHandler, accountEventApplier, aggregateFactory, stateFactory); diff --git a/bank-account/bank-account-application/src/main/java/com/mz/reactor/ddd/reactorddd/application/BankAccountAppConfiguration.java b/bank-account/bank-account-application/src/main/java/com/mz/reactor/ddd/reactorddd/application/BankAccountAppConfiguration.java index 4ab9b57..d2bd2bb 100755 --- a/bank-account/bank-account-application/src/main/java/com/mz/reactor/ddd/reactorddd/application/BankAccountAppConfiguration.java +++ b/bank-account/bank-account-application/src/main/java/com/mz/reactor/ddd/reactorddd/application/BankAccountAppConfiguration.java @@ -40,7 +40,7 @@ public class BankAccountAppConfiguration { .add(accountHandler.route()) .add(transactionHandler.route()) .GET("/health", req -> ServerResponse.ok() - .contentType(MediaType.APPLICATION_JSON_UTF8) + .contentType(MediaType.APPLICATION_JSON) .body(Mono.just("Tick"), String.class)) .onError(Throwable.class, (throwable, serverRequest) -> HttpHandlers.onError(throwable, serverRequest, error -> log.error("Error: ", error))) diff --git a/bank-account/bank-account-application/src/test/java/com/mz/reactor/ddd/reactorddd/application/BankAccountAppTest.java b/bank-account/bank-account-application/src/test/java/com/mz/reactor/ddd/reactorddd/application/BankAccountAppTest.java index 4f5beeb..776dac3 100755 --- a/bank-account/bank-account-application/src/test/java/com/mz/reactor/ddd/reactorddd/application/BankAccountAppTest.java +++ b/bank-account/bank-account-application/src/test/java/com/mz/reactor/ddd/reactorddd/application/BankAccountAppTest.java @@ -61,21 +61,21 @@ public class BankAccountAppTest { .build()) .build(); return webTestClient.post().uri("/accounts") - .contentType(MediaType.APPLICATION_JSON_UTF8) + .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromObject(createAccount1)) .exchange() - .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) + .expectHeader().contentType(MediaType.APPLICATION_JSON) .expectBody(CreateAccountResponse.class) .returnResult().getResponseBody(); } private List getAllAccounts() { return webTestClient.get().uri("/accounts") - .accept(MediaType.APPLICATION_JSON_UTF8) + .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus() .isOk() - .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) + .expectHeader().contentType(MediaType.APPLICATION_JSON) .expectBody(List.class).returnResult().getResponseBody(); } @@ -90,39 +90,39 @@ public class BankAccountAppTest { .build(); return webTestClient.post().uri("/transactions") - .contentType(MediaType.APPLICATION_JSON_UTF8) + .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromObject(createTransactionRequest)) .exchange() .expectStatus().isOk() - .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) + .expectHeader().contentType(MediaType.APPLICATION_JSON) .expectBody(CreateTransactionResponse.class).returnResult().getResponseBody(); } private AccountState getAccount(String id) { return webTestClient.get().uri("/accounts/{id}", id) - .accept(MediaType.APPLICATION_JSON_UTF8) + .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() -// .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) +// .expectHeader().contentType(MediaType.APPLICATION_JSON) .expectBody(AccountState.class).returnResult().getResponseBody(); } private String getAccountString(String id) { return webTestClient.get().uri("/accounts/{id}", id) - .accept(MediaType.APPLICATION_JSON_UTF8) + .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() -// .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) +// .expectHeader().contentType(MediaType.APPLICATION_JSON) .expectBody(String.class).returnResult().getResponseBody(); } private TransactionState getTransaction(String id) { return webTestClient.get().uri("/transactions/{id}", id) - .accept(MediaType.APPLICATION_JSON_UTF8) + .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus() .isOk() - .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) + .expectHeader().contentType(MediaType.APPLICATION_JSON) .expectBody(TransactionState.class).returnResult().getResponseBody(); } } diff --git a/bank-account/transaction/transaction-adapters/src/main/java/com/mz/reactor/ddd/reactorddd/transaction/adapters/account/AccountChangeStreamAdapter.java b/bank-account/transaction/transaction-adapters/src/main/java/com/mz/reactor/ddd/reactorddd/transaction/adapters/account/AccountChangeStreamAdapter.java index fa2d9c0..d205031 100755 --- a/bank-account/transaction/transaction-adapters/src/main/java/com/mz/reactor/ddd/reactorddd/transaction/adapters/account/AccountChangeStreamAdapter.java +++ b/bank-account/transaction/transaction-adapters/src/main/java/com/mz/reactor/ddd/reactorddd/transaction/adapters/account/AccountChangeStreamAdapter.java @@ -59,7 +59,7 @@ public class AccountChangeStreamAdapter { } private void handleTransferMoneyWithdrawn(ApplicationMessageBus bus) { - log.debug("handleTransferMoneyDeposited ->"); + log.debug("handleTransferMoneyWithdrawn ->"); bus.messagesStream() .filter(m -> m instanceof TransferMoneyWithdrawn) .cast(TransferMoneyWithdrawn.class) diff --git a/bank-account/transaction/transaction-api/src/main/java/com/mz/reactor/ddd/reactorddd/transaction/api/TransactionHandler.java b/bank-account/transaction/transaction-api/src/main/java/com/mz/reactor/ddd/reactorddd/transaction/api/TransactionHandler.java index e251218..be24322 100755 --- a/bank-account/transaction/transaction-api/src/main/java/com/mz/reactor/ddd/reactorddd/transaction/api/TransactionHandler.java +++ b/bank-account/transaction/transaction-api/src/main/java/com/mz/reactor/ddd/reactorddd/transaction/api/TransactionHandler.java @@ -26,7 +26,11 @@ public class TransactionHandler implements HttpHandler { private final ObjectMapper mapper; - public TransactionHandler(TransactionApplicationService transactionApplicationService, TransactionQuery query, ObjectMapper mapper) { + public TransactionHandler( + TransactionApplicationService transactionApplicationService, + TransactionQuery query, + ObjectMapper mapper + ) { this.service = Objects.requireNonNull(transactionApplicationService); this.query = Objects.requireNonNull(query); this.mapper = mapper; @@ -34,7 +38,7 @@ public class TransactionHandler implements HttpHandler { public Mono getAll(ServerRequest request) { return ServerResponse.ok() - .contentType(MediaType.APPLICATION_JSON_UTF8) + .contentType(MediaType.APPLICATION_JSON) .body(query.getAll(), TransactionState.class); } @@ -54,9 +58,9 @@ public class TransactionHandler implements HttpHandler { @Override public RouterFunction route() { var route = RouterFunctions - .route(POST("").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::createTransaction) - .andRoute(GET("/").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::getAll) - .andRoute(GET("/{id}").and(accept(MediaType.APPLICATION_JSON_UTF8)), this::getById); + .route(POST("").and(accept(MediaType.APPLICATION_JSON)), this::createTransaction) + .andRoute(GET("/").and(accept(MediaType.APPLICATION_JSON)), this::getAll) + .andRoute(GET("/{id}").and(accept(MediaType.APPLICATION_JSON)), this::getById); return RouterFunctions.route() .nest(path("/transactions"), () -> route) diff --git a/common-components/src/main/java/com/mz/reactor/ddd/common/components/http/HttpHandler.java b/common-components/src/main/java/com/mz/reactor/ddd/common/components/http/HttpHandler.java index 018c1f6..7cb4f58 100755 --- a/common-components/src/main/java/com/mz/reactor/ddd/common/components/http/HttpHandler.java +++ b/common-components/src/main/java/com/mz/reactor/ddd/common/components/http/HttpHandler.java @@ -9,7 +9,7 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import static com.mz.reactor.ddd.common.components.http.HttpHandlers.deserializeJsonString; -import static org.springframework.web.reactive.function.BodyInserters.fromObject; +import static org.springframework.web.reactive.function.BodyInserters.fromValue; public interface HttpHandler { @@ -29,8 +29,8 @@ public interface HttpHandler { default Mono mapToResponse(T result) { return ServerResponse.ok() - .contentType(MediaType.APPLICATION_JSON_UTF8) - .body(fromObject(result)); + .contentType(MediaType.APPLICATION_JSON) + .body(fromValue(result)); } } diff --git a/common-components/src/main/java/com/mz/reactor/ddd/common/components/http/HttpHandlers.java b/common-components/src/main/java/com/mz/reactor/ddd/common/components/http/HttpHandlers.java index f3e7bb7..1b588ee 100755 --- a/common-components/src/main/java/com/mz/reactor/ddd/common/components/http/HttpHandlers.java +++ b/common-components/src/main/java/com/mz/reactor/ddd/common/components/http/HttpHandlers.java @@ -13,18 +13,11 @@ import javax.annotation.Nonnull; import java.util.function.Consumer; import java.util.function.Function; -import static org.springframework.web.reactive.function.BodyInserters.fromObject; +import static org.springframework.web.reactive.function.BodyInserters.fromValue; public final class HttpHandlers { private HttpHandlers() {} -// private final ObjectMapper mapper; -// -// HttpHandlerFunctions() { -// mapper = new ObjectMapper(); -// mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); -// mapper.registerModule(new Jdk8Module()); -// } public static Function> deserializeJsonString( @Nonnull Class clazz, @@ -43,7 +36,7 @@ public final class HttpHandlers { logger.accept(e); return ErrorMessage.builder().error(e.getMessage()).build(); }).flatMap(error -> ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR) - .contentType(MediaType.APPLICATION_JSON_UTF8) - .body(fromObject(error))); + .contentType(MediaType.APPLICATION_JSON) + .body(fromValue(error))); } } diff --git a/common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateFacadeImpl.java b/common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateFacadeImpl.java index 6459f0a..b117331 100755 --- a/common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateFacadeImpl.java +++ b/common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateFacadeImpl.java @@ -44,7 +44,7 @@ public class AggregateFacadeImpl implements AggregateFa public Mono executeReturnEvent(C command, String aggregateID, Class eventType) { var result = aggregateRepository.execute(command, new Id(aggregateID)); return result.flatMap(cr -> processResult(aggregateID, eventType, cr)) - .doOnError(error -> log.error("execute -> event type: "+eventType, error)); + .doOnError(error -> log.error("execute -> event type: " + eventType, error)); } @Override diff --git a/common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateRepositoryImpl.java b/common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateRepositoryImpl.java index 7af9652..8c9aa0f 100755 --- a/common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateRepositoryImpl.java +++ b/common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateRepositoryImpl.java @@ -23,87 +23,88 @@ import static java.util.stream.Collectors.toList; public class AggregateRepositoryImpl implements AggregateRepository { - private final AtomicReference>> eventSource = new AtomicReference<>(new HashMap<>()); + private final AtomicReference>> eventSource = new AtomicReference<>(new HashMap<>()); - private final Cache> cache = CacheBuilder.newBuilder() - .expireAfterAccess(Duration.ofMinutes(10)) - .removalListener((RemovalListener>) notification -> notification.getValue().onDestroy()) - .build(); + private final Cache> cache = CacheBuilder.newBuilder() + .expireAfterAccess(Duration.ofMinutes(10)) + .removalListener((RemovalListener>) notification -> notification.getValue().onDestroy()) + .build(); - private final CommandHandler commandHandler; + private final CommandHandler commandHandler; - private final EventHandler eventHandler; + private final EventHandler eventHandler; - private final Function aggregateFactory; + private final Function aggregateFactory; - private final Function stateFactory; + private final Function stateFactory; - public AggregateRepositoryImpl( - CommandHandler commandHandler, - EventHandler eventHandler, - Function aggregateFactory, - Function stateFactory - ) { - this.commandHandler = commandHandler; - this.eventHandler = eventHandler; - this.aggregateFactory = aggregateFactory; - this.stateFactory = stateFactory; + public AggregateRepositoryImpl( + CommandHandler commandHandler, + EventHandler eventHandler, + Function aggregateFactory, + Function stateFactory + ) { + this.commandHandler = commandHandler; + this.eventHandler = eventHandler; + this.aggregateFactory = aggregateFactory; + this.stateFactory = stateFactory; + } + + private List persistAll(Id id, List events) { + eventSource.updateAndGet(esMap -> { + var eventsToStore = Optional.ofNullable(esMap.get(id)) + .map(es -> Stream.concat(es.stream(), events.stream()) + .sorted(Comparator.comparing(DomainEvent::createdAt)) + .collect(toList())) + .orElse((List) events); + esMap.put(id, eventsToStore); + return esMap; + }); + return events; + } + + private Mono> getAggregate(Id id) { + return Mono.just(id) + .map(this::getFromCache); + } + + private AggregateActor getFromCache(Id id) { + try { + return cache.get(id, () -> + new AggregateActorImpl<>( + id, + commandHandler, + eventHandler, + aggregateFactory, + Optional.ofNullable(eventSource.get().get(id)).orElseGet(List::of), + this::persistAll + ) + ); + } catch (Exception e) { + throw new RuntimeException(e); } + } - private List persistAll(Id id, List events) { - eventSource.updateAndGet(esMap -> { - var eventsToStore = Optional.ofNullable(esMap.get(id)) - .map(es -> Stream.concat(es.stream(), events.stream()) - .sorted(Comparator.comparing(DomainEvent::createdAt)) - .collect(toList())) - .orElse((List) events); - esMap.put(id, eventsToStore); - return esMap; - }); - return events; - } + @Override + public Mono execute(C cmd, Id aggregateId) { + return getAggregate(aggregateId) + .flatMap(a -> a.execute(cmd)); + } - private Mono> getAggregate(Id id) { - return Mono.just(id) - .map(this::getFromCache); - } + @Override + public Mono findById(Id id) { + return getAggregate(id).flatMap(a -> a.getState(this.stateFactory)); + } - private AggregateActor getFromCache(Id id) { - try { - return cache.get(id, () -> - new AggregateActorImpl( - id, - commandHandler, - eventHandler, - aggregateFactory, - Optional.ofNullable(eventSource.get().get(id)).orElseGet(List::of), - this::persistAll - ) - ); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public Mono execute(C cmd, Id aggregateId) { - return getAggregate(aggregateId) - .flatMap(a -> a.execute(cmd)); - } - - @Override - public Mono findById(Id id) { - return getAggregate(id).flatMap(a -> a.getState(this.stateFactory)); - } - - @Override - public Mono findIfExists(Id id) { - return Mono.just(id) - .flatMap(i -> Optional.ofNullable(cache.getIfPresent(i)) - .map(a -> a.getState(this.stateFactory)) - .orElseGet(Mono::empty) - ); - } + @Override + public Mono findIfExists(Id id) { + return Mono.just(id) + .flatMap( + i -> Optional.ofNullable(cache.getIfPresent(i)) + .map(a -> a.getState(this.stateFactory)) + .orElseGet(Mono::empty) + ); + } } diff --git a/common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/query/Query.java b/common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/query/Query.java new file mode 100644 index 0000000..828d0b6 --- /dev/null +++ b/common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/query/Query.java @@ -0,0 +1,20 @@ +package com.mz.reactor.ddd.reactorddd.persistance.query; + +import com.mz.reactor.ddd.common.api.view.DomainView; +import com.mz.reactor.ddd.reactorddd.persistance.query.impl.QueryImpl; +import com.mz.reactor.ddd.reactorddd.persistance.view.impl.ViewRepository; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.function.Supplier; + +public interface Query { + + Mono findById(String id); + + Flux getAll(); + + static Query of(ViewRepository viewRepository, Supplier> documentStream) { + return new QueryImpl<>(viewRepository, documentStream); + } +} diff --git a/common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/query/impl/QueryImpl.java b/common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/query/impl/QueryImpl.java new file mode 100644 index 0000000..b99df2b --- /dev/null +++ b/common-persistence-api/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/query/impl/QueryImpl.java @@ -0,0 +1,39 @@ +package com.mz.reactor.ddd.reactorddd.persistance.query.impl; + +import com.mz.reactor.ddd.common.api.view.DomainView; +import com.mz.reactor.ddd.reactorddd.persistance.query.Query; +import com.mz.reactor.ddd.reactorddd.persistance.view.impl.ViewRepository; +import org.springframework.lang.NonNull; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.function.Predicate; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class QueryImpl implements Query { + + private final ViewRepository repository; + + private final Predicate getAll = v -> true; + + public QueryImpl( + ViewRepository repository, + Supplier> documentStream + ) { + this.repository = requireNonNull(repository, "repository is required"); + requireNonNull(documentStream, "documentStream is required").get() + .subscribe(repository::addView); + } + + @Override + public Mono findById(@NonNull String id) { + return repository.findBy(view -> view.id().equals(id)); + } + + @Override + public Flux getAll() { + return repository.findAllBy(getAll); + } +}