From 3d4e5d448f31b9e72bc091c49fa53ef73d748466 Mon Sep 17 00:00:00 2001 From: Michal Zeman <> Date: Fri, 15 Nov 2019 14:25:15 +0100 Subject: [PATCH] AggregateRepository improvements -> add AtomicReference for eventStore --- .../impl/AggregateRepositoryImpl.java | 31 +++++----- .../aggregate/impl/AggregateActorTest.java | 15 +++-- .../impl/AggregateRepositoryImplTest.java | 56 ++++++++++++++++++- .../aggregate/impl/TestAggregate.java | 15 ++++- .../aggregate/impl/TestAggregateEvent.java | 17 ++++++ .../aggregate/impl/TestFunctions.java | 45 ++++++--------- 6 files changed, 128 insertions(+), 51 deletions(-) diff --git a/common-persistance/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateRepositoryImpl.java b/common-persistance/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateRepositoryImpl.java index a4c0a34..eb254f8 100644 --- a/common-persistance/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateRepositoryImpl.java +++ b/common-persistance/src/main/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateRepositoryImpl.java @@ -14,15 +14,18 @@ import com.mz.reactor.ddd.reactorddd.persistance.aggregate.AggregateRepository; import reactor.core.publisher.Mono; import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; public class AggregateRepositoryImpl implements AggregateRepository { - private final Map> eventSource = new HashMap<>(); +// private final Map> eventSource = new HashMap<>(); + + private final AtomicReference>> eventSource = new AtomicReference<>(new HashMap<>()); private final Cache> cache = CacheBuilder.newBuilder() .expireAfterAccess(Duration.ofMinutes(10)) @@ -51,13 +54,15 @@ public class AggregateRepositoryImpl implements Aggrega } private List persistAll(Id id, List events) { - var eventsToStore = Optional.ofNullable(eventSource.get(id)) - .map(es -> { - es.addAll(events); - return es; - }) - .orElse(events); - eventSource.put(id, eventsToStore); + eventSource.updateAndGet(esMap -> { + var eventsToStore = Optional.ofNullable(esMap.get(id)) + .map(es -> Stream.concat(es.stream(), events.stream()) + .sorted(Comparator.comparing(DomainEvent::createdAt)) + .collect(toList())) + .orElse(events); + esMap.put(id, eventsToStore); + return esMap; + }); return events; } @@ -74,7 +79,7 @@ public class AggregateRepositoryImpl implements Aggrega commandHandler, eventApplier, aggregateFactory, - Optional.ofNullable(eventSource.get(id)).orElseGet(List::of), + Optional.ofNullable(eventSource.get().get(id)).orElseGet(List::of), this::persistAll)); } catch (Exception e) { throw new RuntimeException(e); diff --git a/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateActorTest.java b/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateActorTest.java index 8187f3e..bd65a0a 100644 --- a/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateActorTest.java +++ b/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateActorTest.java @@ -90,15 +90,17 @@ class AggregateActorTest { TestFunctions.FN.eventApplier, TestFunctions.FN.aggregateFactory, List.of(TestAggregateEvent.newBuilder() - .withAmount(10l) + .withAmount(10L) .build(), TestAggregateEvent.newBuilder() - .withAmount(12l) + .withAmount(12L) .build()), TestFunctions.FN.persistAll ); - var rr = subject.getState(a -> a.getValue()); - assertThat(rr).isEqualTo(22l); + Mono source = subject.getState(TestAggregate::getValue); + StepVerifier.create(source) + .expectNext(22L) + .verifyComplete(); } @Test @@ -123,7 +125,10 @@ class AggregateActorTest { subject.execute(cmd).block(); - assertThat(subject.getState(a -> a.getValue())).isEqualTo(32); + Mono source = subject.getState(a -> a.getValue()); + StepVerifier.create(source) + .expectNext(32L) + .verifyComplete(); } } \ No newline at end of file diff --git a/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateRepositoryImplTest.java b/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateRepositoryImplTest.java index dfd9879..db73d85 100644 --- a/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateRepositoryImplTest.java +++ b/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/AggregateRepositoryImplTest.java @@ -1,16 +1,70 @@ package com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl; +import com.mz.reactor.ddd.common.api.valueobject.Id; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; +import java.time.Duration; +import java.util.UUID; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.*; class AggregateRepositoryImplTest { + private final Function> getState = a -> Tuples.of(a.getId(), a.getValue()); + + AggregateRepositoryImpl> subject = + new AggregateRepositoryImpl<>( + TestFunctions.FN.commandHandler, + TestFunctions.FN.eventApplier, + TestFunctions.FN.aggregateFactory, + getState + ); + @Test - void execute() { + void findByIdParallel() { + var source = Flux.range(0, 100) + .parallel(10) + .flatMap(i -> { + var cmd = TestAggregateCommand.newBuilder() + .withCommandId(String.format("comman: %s", i)) + .withValue(i) + .build(); + return subject.execute(cmd, new Id(String.valueOf(i))); + }); + + StepVerifier.create(source) + .expectNextCount(100) + .verifyComplete(); } @Test void findById() { + var cmdId = UUID.randomUUID().toString(); + var cmd = TestAggregateCommand.newBuilder() + .withCommandId(cmdId) + .withValue(10) + .build(); + + var id1 = new Id(UUID.randomUUID().toString()); + var id2 = new Id(UUID.randomUUID().toString()); + + + subject.execute(cmd, id1).block(); + subject.execute(cmd, id2).block(); + subject.execute(cmd, id1).block(); + subject.execute(cmd, id2).block(); + subject.execute(cmd, id2).block(); + subject.execute(cmd, id2).block(); + subject.execute(cmd, id1).block(); + subject.execute(cmd, id1).delayElement(Duration.ofSeconds(10)).block(); + subject.execute(cmd, id1).block(); + assertThat(subject.findById(id1).block().getT2()).isEqualTo(50); + assertThat(subject.findById(id2).block().getT2()).isEqualTo(40); } } \ No newline at end of file diff --git a/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/TestAggregate.java b/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/TestAggregate.java index 76cfb64..f0a9ef1 100644 --- a/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/TestAggregate.java +++ b/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/TestAggregate.java @@ -1,15 +1,20 @@ package com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl; +import com.mz.reactor.ddd.common.api.valueobject.Id; + public class TestAggregate { + private final Id id; + private long value; - public TestAggregate() { - this(0); + public TestAggregate(Id id) { + this(0, id); } - public TestAggregate(long value) { + public TestAggregate(long value, Id id) { this.value = value; + this.id = id; } public TestAggregateEvent validate(TestAggregateCommand command) { @@ -26,4 +31,8 @@ public class TestAggregate { public long getValue() { return value; } + + public Id getId() { + return id; + } } diff --git a/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/TestAggregateEvent.java b/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/TestAggregateEvent.java index 663c982..81dcfd7 100644 --- a/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/TestAggregateEvent.java +++ b/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/TestAggregateEvent.java @@ -2,6 +2,7 @@ package com.mz.reactor.ddd.reactorddd.persistance.aggregate.impl; import com.mz.reactor.ddd.common.api.event.DomainEvent; +import java.time.Instant; import java.util.Optional; import java.util.UUID; @@ -13,10 +14,13 @@ public class TestAggregateEvent implements DomainEvent { private String eventId; + private final Instant createdAt; + private TestAggregateEvent(Builder builder) { correlationId = builder.correlationId; amount = builder.amount; eventId = builder.eventId; + createdAt = builder.createdAt; } public static Builder newBuilder() { @@ -28,6 +32,7 @@ public class TestAggregateEvent implements DomainEvent { builder.correlationId = copy.getCorrelationId(); builder.amount = copy.getAmount(); builder.eventId = copy.getEventId(); + builder.createdAt = copy.createdAt(); return builder; } @@ -55,6 +60,7 @@ public class TestAggregateEvent implements DomainEvent { private String correlationId; private Long amount; private String eventId; + private Instant createdAt = Instant.now(); private Builder() { } @@ -92,6 +98,17 @@ public class TestAggregateEvent implements DomainEvent { return this; } + /** + * Sets the {@code createdAt} and returns a reference to this Builder so that the methods can be chained together. + * + * @param val the {@code createdAt} to set + * @return a reference to this Builder + */ + public Builder withCreatedAt(Instant val) { + createdAt = val; + return this; + } + /** * Returns a {@code TestAggregateEvent} built from the parameters previously set. * diff --git a/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/TestFunctions.java b/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/TestFunctions.java index 0b1b6e3..236fcbf 100644 --- a/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/TestFunctions.java +++ b/common-persistance/src/test/java/com/mz/reactor/ddd/reactorddd/persistance/aggregate/impl/TestFunctions.java @@ -13,39 +13,26 @@ import java.util.function.Function; public enum TestFunctions { FN; - public final CommandHandler commandHandler = new CommandHandler() { - @Override - public CommandResult execute(TestAggregate aggregate, TestAggregateCommand command) { - try { - var event = aggregate.validate(command); - return CommandResult.builder() - .commandId(command.commandId()) - .statusCode(CommandResult.StatusCode.OK) - .addEvents(event) - .build(); - } catch (Exception e) { - return CommandResult.fromError( - new RuntimeException(e), - null, - command - ); - } + public final CommandHandler commandHandler = (aggregate, command) -> { + try { + var event = aggregate.validate(command); + return CommandResult.builder() + .commandId(command.commandId()) + .statusCode(CommandResult.StatusCode.OK) + .addEvents(event) + .build(); + } catch (Exception e) { + return CommandResult.fromError( + new RuntimeException(e), + null, + command + ); } }; - public final EventApplier eventApplier = new EventApplier() { - @Override - public TestAggregate apply(TestAggregate aggregate, DomainEvent event) { - return aggregate.apply((TestAggregateEvent) event); - } - }; + public final EventApplier eventApplier = (aggregate, event) -> aggregate.apply((TestAggregateEvent) event); - public final Function aggregateFactory = new Function() { - @Override - public TestAggregate apply(Id id) { - return new TestAggregate(); - } - }; + public final Function aggregateFactory = TestAggregate::new; public final BiFunction, List> persistAll = (id, events) -> events;