AggregateRepository improvements -> add AtomicReference for eventStore

This commit is contained in:
Michal Zeman
2019-11-15 14:25:15 +01:00
parent 35b4fddcbd
commit 3d4e5d448f
6 changed files with 128 additions and 51 deletions

View File

@@ -14,15 +14,18 @@ import com.mz.reactor.ddd.reactorddd.persistance.aggregate.AggregateRepository;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Stream;
import static java.util.stream.Collectors.toList;
public class AggregateRepositoryImpl<A, C extends Command, S> implements AggregateRepository<A, C, S> {
private final Map<Id, List<DomainEvent>> eventSource = new HashMap<>();
// private final Map<Id, List<DomainEvent>> eventSource = new HashMap<>();
private final AtomicReference<Map<Id, List<DomainEvent>>> eventSource = new AtomicReference<>(new HashMap<>());
private final Cache<Id, AggregateActor<A, C>> cache = CacheBuilder.newBuilder()
.expireAfterAccess(Duration.ofMinutes(10))
@@ -51,13 +54,15 @@ public class AggregateRepositoryImpl<A, C extends Command, S> implements Aggrega
}
private List<DomainEvent> persistAll(Id id, List<DomainEvent> events) {
var eventsToStore = Optional.ofNullable(eventSource.get(id))
.map(es -> {
es.addAll(events);
return es;
})
.orElse(events);
eventSource.put(id, eventsToStore);
eventSource.updateAndGet(esMap -> {
var eventsToStore = Optional.ofNullable(esMap.get(id))
.map(es -> Stream.concat(es.stream(), events.stream())
.sorted(Comparator.comparing(DomainEvent::createdAt))
.collect(toList()))
.orElse(events);
esMap.put(id, eventsToStore);
return esMap;
});
return events;
}
@@ -74,7 +79,7 @@ public class AggregateRepositoryImpl<A, C extends Command, S> implements Aggrega
commandHandler,
eventApplier,
aggregateFactory,
Optional.ofNullable(eventSource.get(id)).orElseGet(List::of),
Optional.ofNullable(eventSource.get().get(id)).orElseGet(List::of),
this::persistAll));
} catch (Exception e) {
throw new RuntimeException(e);

View File

@@ -90,15 +90,17 @@ class AggregateActorTest {
TestFunctions.FN.eventApplier,
TestFunctions.FN.aggregateFactory,
List.of(TestAggregateEvent.newBuilder()
.withAmount(10l)
.withAmount(10L)
.build(), TestAggregateEvent.newBuilder()
.withAmount(12l)
.withAmount(12L)
.build()),
TestFunctions.FN.persistAll
);
var rr = subject.getState(a -> a.getValue());
assertThat(rr).isEqualTo(22l);
Mono<Long> source = subject.getState(TestAggregate::getValue);
StepVerifier.create(source)
.expectNext(22L)
.verifyComplete();
}
@Test
@@ -123,7 +125,10 @@ class AggregateActorTest {
subject.execute(cmd).block();
assertThat(subject.<Long>getState(a -> a.getValue())).isEqualTo(32);
Mono<Long> source = subject.getState(a -> a.getValue());
StepVerifier.create(source)
.expectNext(32L)
.verifyComplete();
}
}

View File

@@ -1,16 +1,70 @@
package com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl;
import com.mz.reactor.ddd.common.api.valueobject.Id;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.time.Duration;
import java.util.UUID;
import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
class AggregateRepositoryImplTest {
private final Function<TestAggregate, Tuple2<Id, Long>> getState = a -> Tuples.of(a.getId(), a.getValue());
AggregateRepositoryImpl<TestAggregate, TestAggregateCommand, Tuple2<Id, Long>> subject =
new AggregateRepositoryImpl<>(
TestFunctions.FN.commandHandler,
TestFunctions.FN.eventApplier,
TestFunctions.FN.aggregateFactory,
getState
);
@Test
void execute() {
void findByIdParallel() {
var source = Flux.range(0, 100)
.parallel(10)
.flatMap(i -> {
var cmd = TestAggregateCommand.newBuilder()
.withCommandId(String.format("comman: %s", i))
.withValue(i)
.build();
return subject.execute(cmd, new Id(String.valueOf(i)));
});
StepVerifier.create(source)
.expectNextCount(100)
.verifyComplete();
}
@Test
void findById() {
var cmdId = UUID.randomUUID().toString();
var cmd = TestAggregateCommand.newBuilder()
.withCommandId(cmdId)
.withValue(10)
.build();
var id1 = new Id(UUID.randomUUID().toString());
var id2 = new Id(UUID.randomUUID().toString());
subject.execute(cmd, id1).block();
subject.execute(cmd, id2).block();
subject.execute(cmd, id1).block();
subject.execute(cmd, id2).block();
subject.execute(cmd, id2).block();
subject.execute(cmd, id2).block();
subject.execute(cmd, id1).block();
subject.execute(cmd, id1).delayElement(Duration.ofSeconds(10)).block();
subject.execute(cmd, id1).block();
assertThat(subject.findById(id1).block().getT2()).isEqualTo(50);
assertThat(subject.findById(id2).block().getT2()).isEqualTo(40);
}
}

View File

@@ -1,15 +1,20 @@
package com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl;
import com.mz.reactor.ddd.common.api.valueobject.Id;
public class TestAggregate {
private final Id id;
private long value;
public TestAggregate() {
this(0);
public TestAggregate(Id id) {
this(0, id);
}
public TestAggregate(long value) {
public TestAggregate(long value, Id id) {
this.value = value;
this.id = id;
}
public TestAggregateEvent validate(TestAggregateCommand command) {
@@ -26,4 +31,8 @@ public class TestAggregate {
public long getValue() {
return value;
}
public Id getId() {
return id;
}
}

View File

@@ -2,6 +2,7 @@ package com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl;
import com.mz.reactor.ddd.common.api.event.DomainEvent;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
@@ -13,10 +14,13 @@ public class TestAggregateEvent implements DomainEvent {
private String eventId;
private final Instant createdAt;
private TestAggregateEvent(Builder builder) {
correlationId = builder.correlationId;
amount = builder.amount;
eventId = builder.eventId;
createdAt = builder.createdAt;
}
public static Builder newBuilder() {
@@ -28,6 +32,7 @@ public class TestAggregateEvent implements DomainEvent {
builder.correlationId = copy.getCorrelationId();
builder.amount = copy.getAmount();
builder.eventId = copy.getEventId();
builder.createdAt = copy.createdAt();
return builder;
}
@@ -55,6 +60,7 @@ public class TestAggregateEvent implements DomainEvent {
private String correlationId;
private Long amount;
private String eventId;
private Instant createdAt = Instant.now();
private Builder() {
}
@@ -92,6 +98,17 @@ public class TestAggregateEvent implements DomainEvent {
return this;
}
/**
* Sets the {@code createdAt} and returns a reference to this Builder so that the methods can be chained together.
*
* @param val the {@code createdAt} to set
* @return a reference to this Builder
*/
public Builder withCreatedAt(Instant val) {
createdAt = val;
return this;
}
/**
* Returns a {@code TestAggregateEvent} built from the parameters previously set.
*

View File

@@ -13,39 +13,26 @@ import java.util.function.Function;
public enum TestFunctions {
FN;
public final CommandHandler<TestAggregate, TestAggregateCommand> commandHandler = new CommandHandler<TestAggregate, TestAggregateCommand>() {
@Override
public CommandResult execute(TestAggregate aggregate, TestAggregateCommand command) {
try {
var event = aggregate.validate(command);
return CommandResult.builder()
.commandId(command.commandId())
.statusCode(CommandResult.StatusCode.OK)
.addEvents(event)
.build();
} catch (Exception e) {
return CommandResult.fromError(
new RuntimeException(e),
null,
command
);
}
public final CommandHandler<TestAggregate, TestAggregateCommand> commandHandler = (aggregate, command) -> {
try {
var event = aggregate.validate(command);
return CommandResult.builder()
.commandId(command.commandId())
.statusCode(CommandResult.StatusCode.OK)
.addEvents(event)
.build();
} catch (Exception e) {
return CommandResult.fromError(
new RuntimeException(e),
null,
command
);
}
};
public final EventApplier<TestAggregate, DomainEvent> eventApplier = new EventApplier<TestAggregate, DomainEvent>() {
@Override
public TestAggregate apply(TestAggregate aggregate, DomainEvent event) {
return aggregate.apply((TestAggregateEvent) event);
}
};
public final EventApplier<TestAggregate, DomainEvent> eventApplier = (aggregate, event) -> aggregate.apply((TestAggregateEvent) event);
public final Function<Id, TestAggregate> aggregateFactory = new Function<Id, TestAggregate>() {
@Override
public TestAggregate apply(Id id) {
return new TestAggregate();
}
};
public final Function<Id, TestAggregate> aggregateFactory = TestAggregate::new;
public final BiFunction<Id, List<DomainEvent>, List<DomainEvent>> persistAll = (id, events) -> events;