Implement simple in-memory event store. | debugo (C) whatthecommit.com

This commit is contained in:
Maksim Kostromin
2019-08-01 02:31:02 +03:00
parent 1065744339
commit cad5ba9255
10 changed files with 272 additions and 0 deletions

View File

@@ -1,4 +1,5 @@
plugins {
id 'io.franzbecker.gradle-lombok' version '3.1.0'
id 'org.springframework.boot' version '2.2.0.M4'
id 'groovy'
id 'idea'
@@ -19,6 +20,11 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.codehaus.groovy:groovy'
implementation 'io.vavr:vavr:0.10.1'
annotationProcessor('org.projectlombok:lombok')
testCompileOnly('org.projectlombok:lombok')
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
exclude group: 'junit', module: 'junit'

View File

@@ -0,0 +1,33 @@
package com.github.daggerok.eventsourcing.user;
import com.github.daggerok.eventsourcing.user.event.DomainEvent;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class InMemoryUserRepository implements UserRepository {
private final Map<UUID, Collection<DomainEvent>> eventStore = new ConcurrentHashMap<>();
@Override
public void save(User user) {
Collection<DomainEvent> domainEvents = eventStore.getOrDefault(user.getUserId(), new ArrayList<>());
Collection<DomainEvent> newEvents = Stream.concat(domainEvents.stream(), user.getEventStream().stream())
.collect(Collectors.toList());
eventStore.put(user.getUserId(), newEvents);
user.flushEvents();
}
@Override
public User find(UUID userId) {
User snapshot = new User(userId);
return eventStore.containsKey(userId)
? User.recreate(snapshot, eventStore.get(userId))
: snapshot;
}
}

View File

@@ -0,0 +1,92 @@
package com.github.daggerok.eventsourcing.user;
import com.github.daggerok.eventsourcing.user.event.DomainEvent;
import com.github.daggerok.eventsourcing.user.event.UserActivated;
import com.github.daggerok.eventsourcing.user.event.UserDeactivated;
import io.vavr.API;
import lombok.Getter;
import lombok.ToString;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import static io.vavr.API.$;
import static io.vavr.API.Case;
import static io.vavr.Predicates.instanceOf;
/**
* Created user can be:
* - activated
* - deactivated
*
* Activated user can be:
* - deactivated
* Activated user cannot be:
* - activated
*
* Deactivated user can be:
* - activated
* Deactivated user cannot be:
* - deactivated
*/
@Getter
@ToString
public class User implements Function<DomainEvent, User> {
private final Collection<DomainEvent> eventStream = new CopyOnWriteArrayList<>();
private final UUID userId;
private UserStatus state;
public User(UUID userId) {
this.userId = userId;
state = UserStatus.PENDING;
}
public void flushEvents() {
eventStream.clear();
}
// cmd 1:
public void activate() {
if (state == UserStatus.ACTIVE)
throw new IllegalStateException("user is already active");
onActivate(new UserActivated(userId, ZonedDateTime.now()));
}
// evt: 1
private User onActivate(UserActivated event) {
eventStream.add(event);
state = UserStatus.ACTIVE;
return this;
}
// cmd 2:
public void deactivate() {
if (state == UserStatus.SUSPENDED)
throw new IllegalStateException("user is already suspended");
onDeactivate(new UserDeactivated(userId, ZonedDateTime.now()));
}
// evt 2:
private User onDeactivate(UserDeactivated event) {
eventStream.add(event);
state = UserStatus.SUSPENDED;
return this;
}
/* es */
public static User recreate(User snapshot, Collection<DomainEvent> domainEvents) {
return io.vavr.collection.List.ofAll(domainEvents)
.foldLeft(snapshot, User::apply);
}
@Override
public User apply(DomainEvent domainEvent) {
return API.Match(domainEvent).of(
Case($(instanceOf(UserActivated.class)), this::onActivate),
Case($(instanceOf(UserDeactivated.class)), this::onDeactivate)
);
}
}

View File

@@ -0,0 +1,8 @@
package com.github.daggerok.eventsourcing.user;
import java.util.UUID;
public interface UserRepository {
void save(User user);
User find(UUID userId);
}

View File

@@ -0,0 +1,5 @@
package com.github.daggerok.eventsourcing.user;
public enum UserStatus {
PENDING, ACTIVE, SUSPENDED
}

View File

@@ -0,0 +1,9 @@
package com.github.daggerok.eventsourcing.user.event;
import java.time.ZonedDateTime;
import java.util.UUID;
public interface DomainEvent {
UUID getAggregateId();
ZonedDateTime getAt();
}

View File

@@ -0,0 +1,15 @@
package com.github.daggerok.eventsourcing.user.event;
import lombok.*;
import java.time.ZonedDateTime;
import java.util.UUID;
@Getter
@ToString
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PACKAGE)
public class UserActivated implements DomainEvent {
UUID aggregateId;
ZonedDateTime at;
}

View File

@@ -0,0 +1,15 @@
package com.github.daggerok.eventsourcing.user.event;
import lombok.*;
import java.time.ZonedDateTime;
import java.util.UUID;
@Getter
@ToString
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PACKAGE)
public class UserDeactivated implements DomainEvent {
UUID aggregateId;
ZonedDateTime at;
}

View File

@@ -0,0 +1,34 @@
package com.github.daggerok.eventsourcing.user
import spock.lang.Specification
class UserRepositoryTest extends Specification {
def userRepository = new InMemoryUserRepository()
def 'save user operation should flush user eventStream'() {
given:
def user = new User(UUID.randomUUID())
and:
user.activate()
when:
userRepository.save(user)
then:
user.eventStream.size() == 0
}
def 'find operation should recreate user state'() {
given:
def userId = UUID.randomUUID()
and:
def user = new User(userId)
and:
user.activate()
and:
userRepository.save(user)
when:
def recreatedUser = userRepository.find(userId)
then:
recreatedUser.state == UserStatus.ACTIVE
}
}

View File

@@ -0,0 +1,55 @@
package com.github.daggerok.eventsourcing.user
import spock.lang.Specification
class UserTest extends Specification {
def 'created user should have pending state'() {
when:
def user = new User(UUID.randomUUID())
then:
user.state == UserStatus.PENDING
}
def 'active user should not be activated'() {
given:
def user = new User(UUID.randomUUID())
and:
user.activate()
when:
user.activate()
then:
thrown(IllegalStateException)
}
def 'created user can be activated'() {
given:
def user = new User(UUID.randomUUID())
when:
user.activate()
then:
user.state == UserStatus.ACTIVE
}
def 'suspended user cannot be deactivated'() {
given:
def user = new User(UUID.randomUUID())
and:
user.deactivate()
when:
user.deactivate()
then:
thrown(IllegalStateException)
}
def 'active user can be deactivated'() {
given:
def user = new User(UUID.randomUUID())
and:
user.activate()
when:
user.deactivate()
then:
user.state == UserStatus.SUSPENDED
}
}