Improve / fix event store. | TODO: write meaningful commit message (C) whatthecommit.com

This commit is contained in:
Maksim Kostromin
2019-08-01 03:03:54 +03:00
parent cad5ba9255
commit 6efd3ed502
8 changed files with 101 additions and 38 deletions

View File

@@ -1,6 +1,8 @@
# CQRS and event sourcing app [![Build Status](https://travis-ci.org/daggerok/cqrs-eventsourcing-user-management-example.svg?branch=master)](https://travis-ci.org/daggerok/cqrs-eventsourcing-user-management-example)
CQRS and event sourcing using dynamic groovy, spring-boot and spring-webflux
Status: _in progress, implemented in-memory event store only, follow updates..._
```bash
./gradlew bootRun

View File

@@ -25,9 +25,8 @@ public class InMemoryUserRepository implements UserRepository {
@Override
public User find(UUID userId) {
User snapshot = new User(userId);
return eventStore.containsKey(userId)
? User.recreate(snapshot, eventStore.get(userId))
: snapshot;
? User.recreate(userId, eventStore.get(userId))
: null;
}
}

View File

@@ -2,12 +2,11 @@ package com.github.daggerok.eventsourcing.user;
import com.github.daggerok.eventsourcing.user.event.DomainEvent;
import com.github.daggerok.eventsourcing.user.event.UserActivated;
import com.github.daggerok.eventsourcing.user.event.UserCreated;
import com.github.daggerok.eventsourcing.user.event.UserDeactivated;
import io.vavr.API;
import lombok.Getter;
import lombok.ToString;
import lombok.*;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -21,12 +20,12 @@ import static io.vavr.Predicates.instanceOf;
* Created user can be:
* - activated
* - deactivated
*
* <p>
* Activated user can be:
* - deactivated
* Activated user cannot be:
* - activated
*
* <p>
* Deactivated user can be:
* - activated
* Deactivated user cannot be:
@@ -34,27 +33,45 @@ import static io.vavr.Predicates.instanceOf;
*/
@Getter
@ToString
@NoArgsConstructor
public class User implements Function<DomainEvent, User> {
private final Collection<DomainEvent> eventStream = new CopyOnWriteArrayList<>();
private final UUID userId;
private UUID userId;
private UserStatus state;
public User(UUID userId) {
this.userId = userId;
state = UserStatus.PENDING;
}
private final Collection<DomainEvent> eventStream = new CopyOnWriteArrayList<>();
public void flushEvents() {
eventStream.clear();
}
// cmd 1:
public User(UUID userId) {
create(userId);
}
public void create(UUID userId) {
onCreate(new UserCreated(userId));
}
// cmd 0: create
public void create() {
create(UUID.randomUUID());
}
// evt: 0
private User onCreate(UserCreated event) {
eventStream.add(event);
this.userId = event.getAggregateId();
state = UserStatus.PENDING;
return this;
}
// cmd 1: activate
public void activate() {
if (state == UserStatus.ACTIVE)
throw new IllegalStateException("user is already active");
onActivate(new UserActivated(userId, ZonedDateTime.now()));
onActivate(new UserActivated(userId));
}
// evt: 1
private User onActivate(UserActivated event) {
eventStream.add(event);
@@ -62,13 +79,14 @@ public class User implements Function<DomainEvent, User> {
return this;
}
// cmd 2:
// cmd 2: deactivate
public void deactivate() {
if (state == UserStatus.SUSPENDED)
throw new IllegalStateException("user is already suspended");
onDeactivate(new UserDeactivated(userId, ZonedDateTime.now()));
onDeactivate(new UserDeactivated(userId));
}
// evt 2:
// evt: 2
private User onDeactivate(UserDeactivated event) {
eventStream.add(event);
state = UserStatus.SUSPENDED;
@@ -77,14 +95,16 @@ public class User implements Function<DomainEvent, User> {
/* es */
public static User recreate(User snapshot, Collection<DomainEvent> domainEvents) {
public static User recreate(UUID userId, Collection<DomainEvent> domainEvents) {
User snapshot = new User(userId);
return io.vavr.collection.List.ofAll(domainEvents)
.foldLeft(snapshot, User::apply);
.foldLeft(snapshot, User::apply);
}
@Override
public User apply(DomainEvent domainEvent) {
return API.Match(domainEvent).of(
Case($(instanceOf(UserCreated.class)), this::onCreate),
Case($(instanceOf(UserActivated.class)), this::onActivate),
Case($(instanceOf(UserDeactivated.class)), this::onDeactivate)
);

View File

@@ -1,15 +1,19 @@
package com.github.daggerok.eventsourcing.user.event;
import lombok.*;
import com.github.daggerok.eventsourcing.user.UserStatus;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import java.time.ZonedDateTime;
import java.util.UUID;
@Getter
@ToString
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PACKAGE)
@RequiredArgsConstructor
public class UserActivated implements DomainEvent {
UUID aggregateId;
ZonedDateTime at;
@NonNull UUID aggregateId;
final UserStatus state = UserStatus.ACTIVE;
final ZonedDateTime at = ZonedDateTime.now();
}

View File

@@ -0,0 +1,19 @@
package com.github.daggerok.eventsourcing.user.event;
import com.github.daggerok.eventsourcing.user.UserStatus;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import java.time.ZonedDateTime;
import java.util.UUID;
@Getter
@ToString
@RequiredArgsConstructor
public class UserCreated implements DomainEvent {
@NonNull UUID aggregateId;
final UserStatus state = UserStatus.PENDING;
final ZonedDateTime at = ZonedDateTime.now();
}

View File

@@ -1,15 +1,20 @@
package com.github.daggerok.eventsourcing.user.event;
import lombok.*;
import com.github.daggerok.eventsourcing.user.UserStatus;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import java.time.ZonedDateTime;
import java.util.UUID;
@Getter
@ToString
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PACKAGE)
@RequiredArgsConstructor
// @NoArgsConstructor(access = AccessLevel.PACKAGE)
public class UserDeactivated implements DomainEvent {
UUID aggregateId;
ZonedDateTime at;
@NonNull UUID aggregateId;
final UserStatus state = UserStatus.SUSPENDED;
final ZonedDateTime at = ZonedDateTime.now();
}

View File

@@ -8,7 +8,9 @@ class UserRepositoryTest extends Specification {
def 'save user operation should flush user eventStream'() {
given:
def user = new User(UUID.randomUUID())
def user = new User()
and:
user.create()
and:
user.activate()
when:
@@ -21,7 +23,9 @@ class UserRepositoryTest extends Specification {
given:
def userId = UUID.randomUUID()
and:
def user = new User(userId)
def user = new User()
and:
user.create(userId)
and:
user.activate()
and:

View File

@@ -5,15 +5,19 @@ import spock.lang.Specification
class UserTest extends Specification {
def 'created user should have pending state'() {
given:
def user = new User()
when:
def user = new User(UUID.randomUUID())
user.create()
then:
user.state == UserStatus.PENDING
}
def 'active user should not be activated'() {
given:
def user = new User(UUID.randomUUID())
def user = new User()
and:
user.create()
and:
user.activate()
when:
@@ -24,7 +28,9 @@ class UserTest extends Specification {
def 'created user can be activated'() {
given:
def user = new User(UUID.randomUUID())
def user = new User()
and:
user.create()
when:
user.activate()
then:
@@ -33,7 +39,9 @@ class UserTest extends Specification {
def 'suspended user cannot be deactivated'() {
given:
def user = new User(UUID.randomUUID())
def user = new User()
and:
user.create()
and:
user.deactivate()
when:
@@ -44,7 +52,9 @@ class UserTest extends Specification {
def 'active user can be deactivated'() {
given:
def user = new User(UUID.randomUUID())
def user = new User()
and:
user.create()
and:
user.activate()
when: