persistence module doc. update

This commit is contained in:
zemo
2020-08-16 15:41:52 +02:00
parent feb89d108e
commit 98748f9aa3
5 changed files with 88 additions and 72 deletions

View File

@@ -6,6 +6,11 @@ import reactor.core.publisher.Mono;
import java.util.function.Function; import java.util.function.Function;
/**
* Contract definition for Aggregate wrapper
* @param <A> - Aggregate type
* @param <C> - Command type
*/
public interface AggregateActor<A, C extends Command> { public interface AggregateActor<A, C extends Command> {
<S> Mono<S> getState(Function<A, S> stateFactory); <S> Mono<S> getState(Function<A, S> stateFactory);

View File

@@ -4,6 +4,12 @@ import com.mz.reactor.ddd.common.api.command.Command;
import com.mz.reactor.ddd.common.api.event.DomainEvent; import com.mz.reactor.ddd.common.api.event.DomainEvent;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
/**
* Aggregate facade contract for operations related with aggregate.
* @param <A> - Aggregate type
* @param <C> - Command type
* @param <S> - State type, representing a state of aggregate
*/
public interface AggregateFacade<A, C extends Command,S> { public interface AggregateFacade<A, C extends Command,S> {
Mono<S> execute(C command, String aggregateID); Mono<S> execute(C command, String aggregateID);

View File

@@ -1,4 +0,0 @@
package com.mz.reactor.ddd.reactorddd.persistance.aggregate;
public interface AggregateFactory {
}

View File

@@ -5,6 +5,12 @@ import com.mz.reactor.ddd.common.api.command.CommandResult;
import com.mz.reactor.ddd.common.api.valueobject.Id; import com.mz.reactor.ddd.common.api.valueobject.Id;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
/**
* {@link AggregateRepository} contract definition.
* @param <A> - Aggregate type
* @param <C> - Command type
* @param <S> - State type, representing a state of aggregate
*/
public interface AggregateRepository<A, C extends Command,S> { public interface AggregateRepository<A, C extends Command,S> {
Mono<CommandResult> execute(C cmd, Id aggregateId); Mono<CommandResult> execute(C cmd, Id aggregateId);

View File

@@ -23,84 +23,87 @@ import static java.util.stream.Collectors.toList;
public class AggregateRepositoryImpl<A, C extends Command, S> implements AggregateRepository<A, C, S> { public class AggregateRepositoryImpl<A, C extends Command, S> implements AggregateRepository<A, C, S> {
private final AtomicReference<Map<Id, List<? extends DomainEvent>>> eventSource = new AtomicReference<>(new HashMap<>()); private final AtomicReference<Map<Id, List<? extends DomainEvent>>> eventSource = new AtomicReference<>(new HashMap<>());
private final Cache<Id, AggregateActor<A, C>> cache = CacheBuilder.newBuilder() private final Cache<Id, AggregateActor<A, C>> cache = CacheBuilder.newBuilder()
.expireAfterAccess(Duration.ofMinutes(10)) .expireAfterAccess(Duration.ofMinutes(10))
.removalListener((RemovalListener<Id, AggregateActor<A, C>>) notification -> notification.getValue().onDestroy()) .removalListener((RemovalListener<Id, AggregateActor<A, C>>) notification -> notification.getValue().onDestroy())
.build(); .build();
private final CommandHandler<A, C> commandHandler; private final CommandHandler<A, C> commandHandler;
private final EventHandler<A> eventHandler; private final EventHandler<A> eventHandler;
private final Function<Id, A> aggregateFactory; private final Function<Id, A> aggregateFactory;
private final Function<A, S> stateFactory; private final Function<A, S> stateFactory;
public AggregateRepositoryImpl( public AggregateRepositoryImpl(
CommandHandler<A, C> commandHandler, CommandHandler<A, C> commandHandler,
EventHandler<A> eventHandler, EventHandler<A> eventHandler,
Function<Id, A> aggregateFactory, Function<Id, A> aggregateFactory,
Function<A, S> stateFactory Function<A, S> stateFactory
) { ) {
this.commandHandler = commandHandler; this.commandHandler = commandHandler;
this.eventHandler = eventHandler; this.eventHandler = eventHandler;
this.aggregateFactory = aggregateFactory; this.aggregateFactory = aggregateFactory;
this.stateFactory = stateFactory; this.stateFactory = stateFactory;
}
private List<? extends DomainEvent> persistAll(Id id, List<? extends DomainEvent> events) {
eventSource.updateAndGet(esMap -> {
var eventsToStore = Optional.ofNullable(esMap.get(id))
.map(es -> Stream.concat(es.stream(), events.stream())
.sorted(Comparator.comparing(DomainEvent::createdAt))
.collect(toList()))
.orElse((List<DomainEvent>) events);
esMap.put(id, eventsToStore);
return esMap;
});
return events;
}
private Mono<AggregateActor<A, C>> getAggregate(Id id) {
return Mono.just(id)
.map(this::getFromCache);
}
private AggregateActor<A, C> getFromCache(Id id) {
try {
return cache.get(id, () ->
new AggregateActorImpl<A, C>(
id,
commandHandler,
eventHandler,
aggregateFactory,
Optional.ofNullable(eventSource.get().get(id)).orElseGet(List::of),
this::persistAll));
} catch (Exception e) {
throw new RuntimeException(e);
} }
}
@Override private List<? extends DomainEvent> persistAll(Id id, List<? extends DomainEvent> events) {
public Mono<CommandResult> execute(C cmd, Id aggregateId) { eventSource.updateAndGet(esMap -> {
return getAggregate(aggregateId) var eventsToStore = Optional.ofNullable(esMap.get(id))
.flatMap(a -> a.execute(cmd)); .map(es -> Stream.concat(es.stream(), events.stream())
} .sorted(Comparator.comparing(DomainEvent::createdAt))
.collect(toList()))
.orElse((List<DomainEvent>) events);
esMap.put(id, eventsToStore);
return esMap;
});
return events;
}
@Override private Mono<AggregateActor<A, C>> getAggregate(Id id) {
public Mono<S> findById(Id id) { return Mono.just(id)
return getAggregate(id).flatMap(a -> a.getState(this.stateFactory)); .map(this::getFromCache);
} }
@Override private AggregateActor<A, C> getFromCache(Id id) {
public Mono<S> findIfExists(Id id) { try {
return Mono.just(id) return cache.get(id, () ->
.flatMap(i -> Optional.ofNullable(cache.getIfPresent(i)) new AggregateActorImpl<A, C>(
.map(a -> a.getState(this.stateFactory)) id,
.orElseGet(Mono::empty)); commandHandler,
} eventHandler,
aggregateFactory,
Optional.ofNullable(eventSource.get().get(id)).orElseGet(List::of),
this::persistAll
)
);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public Mono<CommandResult> execute(C cmd, Id aggregateId) {
return getAggregate(aggregateId)
.flatMap(a -> a.execute(cmd));
}
@Override
public Mono<S> findById(Id id) {
return getAggregate(id).flatMap(a -> a.getState(this.stateFactory));
}
@Override
public Mono<S> findIfExists(Id id) {
return Mono.just(id)
.flatMap(i -> Optional.ofNullable(cache.getIfPresent(i))
.map(a -> a.getState(this.stateFactory))
.orElseGet(Mono::empty)
);
}
} }