transfer money operation

This commit is contained in:
Michal Zeman
2019-12-14 11:51:46 +01:00
parent 65381f7f50
commit edf063d0c4
72 changed files with 1421 additions and 276 deletions

View File

@@ -7,12 +7,12 @@ import reactor.core.publisher.Mono;
import java.util.function.Function;
public interface AggregateActor<A, C extends Command, E extends DomainEvent> {
public interface AggregateActor<A, C extends Command> {
<S> Mono<S> getState(Function<A, S> stateFactory);
void onDestroy();
Mono<CommandResult<E>> execute(C cmd);
Mono<CommandResult> execute(C cmd);
}

View File

@@ -4,9 +4,9 @@ import com.mz.reactor.ddd.common.api.command.Command;
import com.mz.reactor.ddd.common.api.event.DomainEvent;
import reactor.core.publisher.Mono;
public interface AggregateFacade<A, C extends Command, E extends DomainEvent,S> {
public interface AggregateFacade<A, C extends Command,S> {
Mono<S> execute(C command, String aggregateID);
Mono<E> executeReturnEvent(C command, String aggregateID);
Mono<? extends DomainEvent> executeReturnEvent(C command, String aggregateID, Class<? extends DomainEvent> event);
}

View File

@@ -6,9 +6,9 @@ import com.mz.reactor.ddd.common.api.event.DomainEvent;
import com.mz.reactor.ddd.common.api.valueobject.Id;
import reactor.core.publisher.Mono;
public interface AggregateRepository<A, C extends Command, E extends DomainEvent,S> {
public interface AggregateRepository<A, C extends Command,S> {
Mono<CommandResult<E>> execute(C cmd, Id aggregateId);
Mono<CommandResult> execute(C cmd, Id aggregateId);
Mono<S> findById(Id id);
}

View File

@@ -7,6 +7,8 @@ import com.mz.reactor.ddd.common.api.event.DomainEvent;
import com.mz.reactor.ddd.common.api.event.EventApplier;
import com.mz.reactor.ddd.common.api.valueobject.Id;
import com.mz.reactor.ddd.reactorddd.persistance.aggregate.AggregateActor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
@@ -19,15 +21,17 @@ import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Function;
public class AggregateActorImpl<A, C extends Command, E extends DomainEvent> implements AggregateActor<A, C, E> {
public class AggregateActorImpl<A, C extends Command> implements AggregateActor<A, C> {
private static final Log log = LogFactory.getLog(AggregateActorImpl.class);
private final Id id;
private final CommandHandler<A, C> commandHandler;
private final EventApplier<A, E> eventApplier;
private final EventApplier<A> eventApplier;
private final BiFunction<Id, List<E>, List<E>> persistAll;
private final BiFunction<Id, List<? extends DomainEvent>, List<? extends DomainEvent>> persistAll;
private A aggregate;
@@ -35,17 +39,17 @@ public class AggregateActorImpl<A, C extends Command, E extends DomainEvent> imp
private final FluxSink<C> commandSink = commandProcessor.sink();
private final ReplayProcessor<CommandResult<E>> commandResultReplayProcessor = ReplayProcessor.create();
private final ReplayProcessor<CommandResult> commandResultReplayProcessor = ReplayProcessor.create();
private final FluxSink<CommandResult<E>> commandResultSink = commandResultReplayProcessor.sink();
private final FluxSink<CommandResult> commandResultSink = commandResultReplayProcessor.sink();
public AggregateActorImpl(
Id id,
CommandHandler<A, C> commandHandler,
EventApplier<A, E> eventApplier,
EventApplier<A> eventApplier,
Function<Id, A> aggregateFactory,
List<E> domainEvents,
BiFunction<Id, List<E>, List<E>> persistAll
List<? extends DomainEvent> domainEvents,
BiFunction<Id, List<? extends DomainEvent>, List<? extends DomainEvent>> persistAll
) {
this.id = id;
this.commandHandler = commandHandler;
@@ -60,11 +64,12 @@ public class AggregateActorImpl<A, C extends Command, E extends DomainEvent> imp
commandProcessor
.publishOn(Schedulers.newSingle(String.format("AggregateActor: %s", id)))
.log()
.doOnError(error -> log.error("handleCommand -> ", error))
.subscribe(this::handleCommand);
}
private void handleCommand(C cmd) {
var commandResult = commandHandler.<E>execute(this.aggregate, cmd);
var commandResult = commandHandler.execute(this.aggregate, cmd);
this.aggregate = (A) persistAll
.andThen(events -> events.stream()
.reduce(this.aggregate, eventApplier::apply, (a1, a2) -> a2))
@@ -73,8 +78,8 @@ public class AggregateActorImpl<A, C extends Command, E extends DomainEvent> imp
}
@Override
public Mono<CommandResult<E>> execute(C cmd) {
Mono<CommandResult<E>> result = commandResultReplayProcessor.publishOn(Schedulers.elastic())
public Mono<CommandResult> execute(C cmd) {
Mono<CommandResult> result = commandResultReplayProcessor.publishOn(Schedulers.elastic())
.filter(r -> r.commandId().equals(cmd.commandId()))
.next();
commandSink.next(cmd);

View File

@@ -7,20 +7,25 @@ import com.mz.reactor.ddd.common.api.event.DomainEvent;
import com.mz.reactor.ddd.common.api.valueobject.Id;
import com.mz.reactor.ddd.reactorddd.persistance.aggregate.AggregateFacade;
import com.mz.reactor.ddd.reactorddd.persistance.aggregate.AggregateRepository;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
public class AggregateFacadeImpl<A, C extends Command, E extends DomainEvent ,S> implements AggregateFacade<A, C, E, S> {
public class AggregateFacadeImpl<A, C extends Command, S> implements AggregateFacade<A, C, S> {
private final AggregateRepository<A, C, E, S> aggregateRepository;
private static final Log log = LogFactory.getLog(AggregateFacadeImpl.class);
private final AggregateRepository<A, C, S> aggregateRepository;
private final Consumer<Message> publishChanged;
private final Consumer<S> publishDocument;
public AggregateFacadeImpl(
AggregateRepository<A, C, E, S> aggregateRepository,
AggregateRepository<A, C, S> aggregateRepository,
Consumer<Message> publishChanged,
Consumer<S> publishDocument
) {
@@ -32,29 +37,60 @@ public class AggregateFacadeImpl<A, C extends Command, E extends DomainEvent ,S>
@Override
public Mono<S> execute(C command, String aggregateID) {
return aggregateRepository.execute(command, new Id(aggregateID))
.flatMap(processResult(aggregateID));
.flatMap(cr -> processResult(aggregateID, cr))
.doOnError(error -> log.error("execute -> ", error));
}
@Override
public Mono<E> executeReturnEvent(C command, String aggregateID) {
public Mono<? extends DomainEvent> executeReturnEvent(C command, String aggregateID, Class<? extends DomainEvent> eventType) {
var result = aggregateRepository.execute(command, new Id(aggregateID));
return result.map(r -> r.events().stream().findAny().get());
return result.flatMap(cr -> processResult(aggregateID, eventType, cr))
.doOnError(error -> log.error("execute -> event type: "+eventType, error));
}
private Function<CommandResult<E>, Mono<S>> processResult(String aggregateId) {
return result -> {
switch (result.statusCode()) {
case OK:
return aggregateRepository.findById(new Id(aggregateId))
.doOnSuccess(publishDocument)
.doOnSuccess(s -> result.events().forEach(publishChanged));
case FAILED:
return Mono.error(result.error().orElseGet(() -> new RuntimeException("Generic error")));
case NOT_MODIFIED:
default:
return Mono.empty();
}
};
private Mono<? extends DomainEvent> processResult(String aggregateId, Class<? extends DomainEvent> eventType, CommandResult result) {
switch (result.statusCode()) {
case OK:
return publishChanges(aggregateId, result)
.map(state -> result.events().stream()
.filter(e -> isInstance(e, eventType))
.map(eventType::cast)
.findAny())
.map(Optional::get);
case FAILED:
return onFailed(result);
case NOT_MODIFIED:
default:
return Mono.empty();
}
}
private Mono<S> processResult(String aggregateId, CommandResult result) {
switch (result.statusCode()) {
case OK:
return publishChanges(aggregateId, result);
case FAILED:
return onFailed(result);
case NOT_MODIFIED:
default:
return Mono.empty();
}
}
private <T> Mono<T> onFailed(CommandResult result) {
result.events().forEach(publishChanged);
return Mono.error(result.error().orElseGet(() -> new RuntimeException("Generic error")));
}
private Mono<S> publishChanges(String aggregateId, CommandResult result) {
return aggregateRepository.findById(new Id(aggregateId))
.doOnSuccess(publishDocument)
.doOnSuccess(s -> result.events().forEach(publishChanged));
}
protected <E> boolean isInstance(Object obj, Class<E> type) {
return Optional.ofNullable(type)
.flatMap(t -> Optional.ofNullable(obj).map(t::isInstance)).orElse(false);
}
}

View File

@@ -21,19 +21,19 @@ import java.util.stream.Stream;
import static java.util.stream.Collectors.toList;
public class AggregateRepositoryImpl<A, C extends Command, E extends DomainEvent,S> implements AggregateRepository<A, C, E, S> {
public class AggregateRepositoryImpl<A, C extends Command,S> implements AggregateRepository<A, C, S> {
private final AtomicReference<Map<Id, List<E>>> eventSource = new AtomicReference<>(new HashMap<>());
private final AtomicReference<Map<Id, List<? extends DomainEvent>>> eventSource = new AtomicReference<>(new HashMap<>());
private final Cache<Id, AggregateActor<A, C, E>> cache = CacheBuilder.newBuilder()
private final Cache<Id, AggregateActor<A, C>> cache = CacheBuilder.newBuilder()
.expireAfterAccess(Duration.ofMinutes(10))
.removalListener((RemovalListener<Id, AggregateActor<A, C, E>>) notification -> notification.getValue().onDestroy())
.removalListener((RemovalListener<Id, AggregateActor<A, C>>) notification -> notification.getValue().onDestroy())
.build();
private final CommandHandler<A, C> commandHandler;
private final EventApplier<A, E> eventApplier;
private final EventApplier<A> eventApplier;
private final Function<Id, A> aggregateFactory;
@@ -41,7 +41,7 @@ public class AggregateRepositoryImpl<A, C extends Command, E extends DomainEvent
public AggregateRepositoryImpl(
CommandHandler<A, C> commandHandler,
EventApplier<A, E> eventApplier,
EventApplier<A> eventApplier,
Function<Id, A> aggregateFactory,
Function<A, S> stateFactory
) {
@@ -51,28 +51,28 @@ public class AggregateRepositoryImpl<A, C extends Command, E extends DomainEvent
this.stateFactory = stateFactory;
}
private List<E> persistAll(Id id, List<E> events) {
private List<? extends DomainEvent> persistAll(Id id, List<? extends DomainEvent> events) {
eventSource.updateAndGet(esMap -> {
var eventsToStore = Optional.ofNullable(esMap.get(id))
.map(es -> Stream.concat(es.stream(), events.stream())
.sorted(Comparator.comparing(DomainEvent::createdAt))
.collect(toList()))
.orElse(events);
.orElse((List<DomainEvent>) events);
esMap.put(id, eventsToStore);
return esMap;
});
return events;
}
private Mono<AggregateActor<A, C, E>> getAggregate(Id id) {
private Mono<AggregateActor<A, C>> getAggregate(Id id) {
return Mono.just(id)
.map(this::getFromCache);
}
private AggregateActor<A, C, E> getFromCache(Id id) {
private AggregateActor<A, C> getFromCache(Id id) {
try {
return cache.get(id, () ->
new AggregateActorImpl<>(
new AggregateActorImpl<A, C>(
id,
commandHandler,
eventApplier,
@@ -85,7 +85,7 @@ public class AggregateRepositoryImpl<A, C extends Command, E extends DomainEvent
}
@Override
public Mono<CommandResult<E>> execute(C cmd, Id aggregateId) {
public Mono<CommandResult> execute(C cmd, Id aggregateId) {
return getAggregate(aggregateId)
.flatMap(a -> a.execute(cmd));
}

View File

@@ -22,7 +22,7 @@ class AggregateActorTest {
.withValue(10)
.build();
var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand, TestAggregateEvent>(
var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand>(
new Id(UUID.randomUUID().toString()),
TestFunctions.FN.commandHandler,
TestFunctions.FN.eventApplier,
@@ -31,7 +31,7 @@ class AggregateActorTest {
TestFunctions.FN.persistAll
);
Mono<CommandResult<TestAggregateEvent>> result = subject.execute(command);
Mono<CommandResult> result = subject.execute(command);
StepVerifier.create(result)
.expectNextMatches(r ->
r.commandId().equals(commandId) && r.statusCode().equals(CommandResult.StatusCode.OK))
@@ -40,7 +40,7 @@ class AggregateActorTest {
@Test
void onDestroy() {
var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand, TestAggregateEvent>(
var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand>(
new Id(UUID.randomUUID().toString()),
TestFunctions.FN.commandHandler,
TestFunctions.FN.eventApplier,
@@ -60,7 +60,7 @@ class AggregateActorTest {
@Test
void executeParallel() {
var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand, TestAggregateEvent>(
var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand>(
new Id(UUID.randomUUID().toString()),
TestFunctions.FN.commandHandler,
TestFunctions.FN.eventApplier,
@@ -84,7 +84,7 @@ class AggregateActorTest {
@Test
public void testRecoveryState() {
var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand, TestAggregateEvent>(
var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand>(
new Id(UUID.randomUUID().toString()),
TestFunctions.FN.commandHandler,
TestFunctions.FN.eventApplier,
@@ -105,7 +105,7 @@ class AggregateActorTest {
@Test
public void test_RecoveryStateAndExecuteCommand() {
var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand, TestAggregateEvent>(
var subject = new AggregateActorImpl<TestAggregate, TestAggregateCommand>(
new Id(UUID.randomUUID().toString()),
TestFunctions.FN.commandHandler,
TestFunctions.FN.eventApplier,

View File

@@ -18,7 +18,7 @@ class AggregateRepositoryImplTest {
private final Function<TestAggregate, Tuple2<Id, Long>> getState = a -> Tuples.of(a.getId(), a.getValue());
AggregateRepositoryImpl<TestAggregate, TestAggregateCommand, TestAggregateEvent,Tuple2<Id, Long>> subject =
AggregateRepositoryImpl<TestAggregate, TestAggregateCommand,Tuple2<Id, Long>> subject =
new AggregateRepositoryImpl<>(
TestFunctions.FN.commandHandler,
TestFunctions.FN.eventApplier,

View File

@@ -2,6 +2,7 @@ package com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl;
import com.mz.reactor.ddd.common.api.command.CommandHandler;
import com.mz.reactor.ddd.common.api.command.CommandResult;
import com.mz.reactor.ddd.common.api.event.DomainEvent;
import com.mz.reactor.ddd.common.api.event.EventApplier;
import com.mz.reactor.ddd.common.api.valueobject.Id;
@@ -14,13 +15,13 @@ public enum TestFunctions {
public final CommandHandler<TestAggregate, TestAggregateCommand> commandHandler = new CommandHandler<TestAggregate, TestAggregateCommand>() {
@Override
public CommandResult<TestAggregateEvent> execute(TestAggregate aggregate, TestAggregateCommand command) {
public CommandResult execute(TestAggregate aggregate, TestAggregateCommand command) {
try {
TestAggregateEvent event = aggregate.validate(command);
return CommandResult.builder()
.commandId(command.commandId())
.statusCode(CommandResult.StatusCode.OK)
.addEvents(event)
.events(List.of(event))
.build();
} catch (Exception e) {
return CommandResult.fromError(
@@ -32,11 +33,16 @@ public enum TestFunctions {
}
};
public final EventApplier<TestAggregate, TestAggregateEvent> eventApplier = (aggregate, event) -> aggregate.apply((TestAggregateEvent) event);
public final EventApplier<TestAggregate> eventApplier = new EventApplier<TestAggregate>() {
@Override
public <E extends DomainEvent> TestAggregate apply(TestAggregate aggregate, E event) {
return aggregate.apply((TestAggregateEvent) event);
}
};
public final Function<Id, TestAggregate> aggregateFactory = TestAggregate::new;
public final BiFunction<Id, List<TestAggregateEvent>, List<TestAggregateEvent>> persistAll = (id, events) -> events;
public final BiFunction<Id, List<? extends DomainEvent>, List<? extends DomainEvent>> persistAll = (id, events) -> events;
public Function<String, String> mapTest1(String val1, String val2) {
return v -> val1 + " and " + val2 + " not " + v;

View File

@@ -2,9 +2,11 @@ package com.mz.reactor.ddd.reactorddd.persistance.model;
import com.mz.reactor.ddd.common.api.view.DomainView;
import java.time.Instant;
import java.util.Optional;
public class TestView implements DomainView {
private final String id;
private final String value;
private TestView(Builder builder) {
@@ -25,6 +27,11 @@ public class TestView implements DomainView {
return id;
}
@Override
public Optional<String> correlationId() {
return Optional.empty();
}
public static Builder newBuilder() {
return new Builder();
}