feature: add event sourcing microservice init
This commit is contained in:
@@ -0,0 +1,48 @@
|
||||
package com.eventsourcing.bankAccount.commands;
|
||||
|
||||
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
|
||||
import com.eventsourcing.es.EventStoreDB;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class BankAccountCommandHandler implements BankAccountCommandService{
|
||||
|
||||
private final EventStoreDB eventStoreDB;
|
||||
|
||||
@Override
|
||||
public String handle(CreateBankAccountCommand command) {
|
||||
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
|
||||
aggregate.createBankAccount(command.email(), command.address(), command.userName());
|
||||
eventStoreDB.save(aggregate);
|
||||
|
||||
log.info("(CreateBankAccountCommand) aggregate: {}", aggregate);
|
||||
return aggregate.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(ChangeEmailCommand command) {
|
||||
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
|
||||
aggregate.changeEmail(command.newEmail());
|
||||
eventStoreDB.save(aggregate);
|
||||
log.info("(ChangeEmailCommand) aggregate: {}", aggregate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(ChangeAddressCommand command) {
|
||||
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
|
||||
aggregate.changeAddress(command.newAddress());
|
||||
eventStoreDB.save(aggregate);
|
||||
log.info("(ChangeAddressCommand) aggregate: {}", aggregate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(DepositAmountCommand command) {
|
||||
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
|
||||
aggregate.depositBalance(command.amount());
|
||||
eventStoreDB.save(aggregate);
|
||||
log.info("(DepositAmountCommand) aggregate: {}", aggregate);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.eventsourcing.bankAccount.commands;
|
||||
|
||||
public interface BankAccountCommandService {
|
||||
String handle(CreateBankAccountCommand command);
|
||||
|
||||
void handle(ChangeEmailCommand command);
|
||||
|
||||
void handle(ChangeAddressCommand command);
|
||||
|
||||
void handle(DepositAmountCommand command);
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.eventsourcing.bankAccount.commands;
|
||||
|
||||
public record ChangeAddressCommand(String aggregateID, String newAddress) {
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.eventsourcing.bankAccount.commands;
|
||||
|
||||
public record ChangeEmailCommand(String aggregateID, String newEmail) {
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.eventsourcing.bankAccount.commands;
|
||||
|
||||
public record CreateBankAccountCommand(String aggregateID, String email, String userName, String address) {
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
package com.eventsourcing.bankAccount.commands;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
public record DepositAmountCommand(String aggregateID, BigDecimal amount) {
|
||||
}
|
||||
@@ -0,0 +1,118 @@
|
||||
package com.eventsourcing.bankAccount.domain;
|
||||
|
||||
import com.eventsourcing.bankAccount.events.AddressUpdatedEvent;
|
||||
import com.eventsourcing.bankAccount.events.BalanceDepositedEvent;
|
||||
import com.eventsourcing.bankAccount.events.BankAccountCreatedEvent;
|
||||
import com.eventsourcing.bankAccount.events.EmailChangedEvent;
|
||||
import com.eventsourcing.es.AggregateRoot;
|
||||
import com.eventsourcing.es.Event;
|
||||
import com.eventsourcing.es.SerializerUtils;
|
||||
import com.eventsourcing.es.exceptions.InvalidEventTypeException;
|
||||
import lombok.*;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@EqualsAndHashCode(callSuper = false)
|
||||
public class BankAccountAggregate extends AggregateRoot {
|
||||
|
||||
|
||||
public static final String AGGREGATE_TYPE = "BankAccountAggregate";
|
||||
|
||||
public BankAccountAggregate(String id) {
|
||||
super(id, AGGREGATE_TYPE);
|
||||
}
|
||||
|
||||
private String email;
|
||||
private String userName;
|
||||
private String address;
|
||||
private BigDecimal balance;
|
||||
|
||||
|
||||
@Override
|
||||
public void when(Event event) {
|
||||
switch (event.getEventType()) {
|
||||
case BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1 ->
|
||||
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BankAccountCreatedEvent.class));
|
||||
case EmailChangedEvent.EMAIL_CHANGED_V1 ->
|
||||
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), EmailChangedEvent.class));
|
||||
case AddressUpdatedEvent.ADDRESS_UPDATED_V1 ->
|
||||
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class));
|
||||
case BalanceDepositedEvent.BALANCE_DEPOSITED ->
|
||||
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class));
|
||||
default -> throw new InvalidEventTypeException(event.getEventType());
|
||||
}
|
||||
}
|
||||
|
||||
private void handle(final BankAccountCreatedEvent event) {
|
||||
this.email = event.getEmail();
|
||||
this.userName = event.getUserName();
|
||||
this.address = event.getAddress();
|
||||
this.balance = BigDecimal.valueOf(0);
|
||||
}
|
||||
|
||||
private void handle(final EmailChangedEvent event) {
|
||||
this.email = event.getNewEmail();
|
||||
}
|
||||
|
||||
private void handle(final AddressUpdatedEvent event) {
|
||||
this.address = event.getNewAddress();
|
||||
}
|
||||
|
||||
private void handle(final BalanceDepositedEvent event) {
|
||||
this.balance = this.balance.add(event.getAmount());
|
||||
}
|
||||
|
||||
public void createBankAccount(String email, String address, String userName) {
|
||||
final var data = BankAccountCreatedEvent.builder()
|
||||
.aggregateId(id)
|
||||
.email(email)
|
||||
.address(address)
|
||||
.userName(userName)
|
||||
.build();
|
||||
|
||||
final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
|
||||
final var event = this.createEvent(BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1, dataBytes, null);
|
||||
this.apply(event);
|
||||
}
|
||||
|
||||
public void changeEmail(String email) {
|
||||
final var data = EmailChangedEvent.builder().aggregateId(id).newEmail(email).build();
|
||||
final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
|
||||
final var event = this.createEvent(EmailChangedEvent.EMAIL_CHANGED_V1, dataBytes, null);
|
||||
apply(event);
|
||||
|
||||
}
|
||||
|
||||
public void changeAddress(String newAddress) {
|
||||
final var data = AddressUpdatedEvent.builder().aggregateId(id).newAddress(newAddress).build();
|
||||
final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
|
||||
final var event = this.createEvent(AddressUpdatedEvent.ADDRESS_UPDATED_V1, dataBytes, null);
|
||||
apply(event);
|
||||
}
|
||||
|
||||
public void depositBalance(BigDecimal amount) {
|
||||
final var data = BalanceDepositedEvent.builder().aggregateId(id).amount(amount).build();
|
||||
final byte[] dataBytes = SerializerUtils.serializeToJsonBytes(data);
|
||||
final var event = this.createEvent(BalanceDepositedEvent.BALANCE_DEPOSITED, dataBytes, null);
|
||||
apply(event);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "BankAccountAggregate{" +
|
||||
"email='" + email + '\'' +
|
||||
", userName='" + userName + '\'' +
|
||||
", address='" + address + '\'' +
|
||||
", balance=" + balance +
|
||||
", id='" + id + '\'' +
|
||||
", type='" + type + '\'' +
|
||||
", version=" + version +
|
||||
", changes=" + changes.size() +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.eventsourcing.bankAccount.dto;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
public record BankAccountResponseDTO(
|
||||
String aggregateId,
|
||||
String email,
|
||||
String address,
|
||||
String userName,
|
||||
BigDecimal balance) {
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.eventsourcing.bankAccount.dto;
|
||||
|
||||
public record ChangeAddressRequestDTO(String newAddress) {
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.eventsourcing.bankAccount.dto;
|
||||
|
||||
public record ChangeEmailRequestDTO(String newEmail) {
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.eventsourcing.bankAccount.dto;
|
||||
|
||||
public record CreateBankAccountRequestDTO(
|
||||
String email,
|
||||
String address,
|
||||
String userName) {
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
package com.eventsourcing.bankAccount.dto;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
public record DepositAmountRequestDTO(BigDecimal amount) {
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package com.eventsourcing.bankAccount.events;
|
||||
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
|
||||
import com.eventsourcing.es.BaseEvent;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class AddressUpdatedEvent extends BaseEvent {
|
||||
public static final String ADDRESS_UPDATED_V1 = "ADDRESS_UPDATED_V1";
|
||||
public static final String AGGREGATE_TYPE = BankAccountAggregate.AGGREGATE_TYPE;
|
||||
|
||||
@Builder
|
||||
public AddressUpdatedEvent(String aggregateId, String newAddress) {
|
||||
super(aggregateId);
|
||||
this.newAddress = newAddress;
|
||||
}
|
||||
|
||||
private String newAddress;
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.eventsourcing.bankAccount.events;
|
||||
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
|
||||
import com.eventsourcing.es.BaseEvent;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
@Data
|
||||
public class BalanceDepositedEvent extends BaseEvent {
|
||||
public static final String BALANCE_DEPOSITED = "BALANCE_DEPOSITED_V1";
|
||||
public static final String AGGREGATE_TYPE = BankAccountAggregate.AGGREGATE_TYPE;
|
||||
|
||||
private BigDecimal amount;
|
||||
|
||||
@Builder
|
||||
public BalanceDepositedEvent(String aggregateId, BigDecimal amount) {
|
||||
super(aggregateId);
|
||||
this.amount = amount;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.eventsourcing.bankAccount.events;
|
||||
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
|
||||
import com.eventsourcing.es.BaseEvent;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class BankAccountCreatedEvent extends BaseEvent {
|
||||
public static final String BANK_ACCOUNT_CREATED_V1 = "BANK_ACCOUNT_CREATED_V1";
|
||||
public static final String AGGREGATE_TYPE = BankAccountAggregate.AGGREGATE_TYPE;
|
||||
|
||||
@Builder
|
||||
public BankAccountCreatedEvent(String aggregateId, String email, String userName, String address) {
|
||||
super(aggregateId);
|
||||
this.email = email;
|
||||
this.userName = userName;
|
||||
this.address = address;
|
||||
}
|
||||
|
||||
private String email;
|
||||
private String userName;
|
||||
private String address;
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package com.eventsourcing.bankAccount.events;
|
||||
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
|
||||
import com.eventsourcing.es.BaseEvent;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class EmailChangedEvent extends BaseEvent {
|
||||
public static final String EMAIL_CHANGED_V1 = "EMAIL_CHANGED_V1";
|
||||
public static final String AGGREGATE_TYPE = BankAccountAggregate.AGGREGATE_TYPE;
|
||||
|
||||
private String newEmail;
|
||||
|
||||
@Builder
|
||||
public EmailChangedEvent(String aggregateId, String newEmail) {
|
||||
super(aggregateId);
|
||||
this.newEmail = newEmail;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.eventsourcing.bankAccount.exceptions;
|
||||
|
||||
public class BankAccountNotFoundException extends RuntimeException {
|
||||
|
||||
public BankAccountNotFoundException() {
|
||||
}
|
||||
|
||||
public BankAccountNotFoundException(String id) {
|
||||
super("bank account not found id: " + id);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.eventsourcing.bankAccount.queries;
|
||||
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
|
||||
import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO;
|
||||
import com.eventsourcing.es.EventStoreDB;
|
||||
import com.eventsourcing.mappers.BankAccountMapper;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class BankAccountQueryHandler implements BankAccountQueryService {
|
||||
|
||||
private final EventStoreDB eventStoreDB;
|
||||
|
||||
@Override
|
||||
public BankAccountResponseDTO handle(GetBankAccountByIDQuery query) {
|
||||
final var aggregate = eventStoreDB.load(query.aggregateID(), BankAccountAggregate.class);
|
||||
final var bankAccountResponseDTO = BankAccountMapper.bankAccountResponseDTOFromAggregate(aggregate);
|
||||
log.info("(GetBankAccountByIDQuery) response: {}", bankAccountResponseDTO);
|
||||
return bankAccountResponseDTO;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.eventsourcing.bankAccount.queries;
|
||||
|
||||
import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO;
|
||||
|
||||
public interface BankAccountQueryService {
|
||||
BankAccountResponseDTO handle(GetBankAccountByIDQuery query);
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.eventsourcing.bankAccount.queries;
|
||||
|
||||
public record GetBankAccountByIDQuery(String aggregateID) {
|
||||
}
|
||||
96
src/main/java/com/eventsourcing/es/AggregateRoot.java
Normal file
96
src/main/java/com/eventsourcing/es/AggregateRoot.java
Normal file
@@ -0,0 +1,96 @@
|
||||
package com.eventsourcing.es;
|
||||
|
||||
import com.eventsourcing.es.exceptions.InvalidEventException;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public abstract class AggregateRoot {
|
||||
|
||||
protected String id;
|
||||
protected String type;
|
||||
protected long version;
|
||||
protected final List<Event> changes = new ArrayList<>();
|
||||
|
||||
public AggregateRoot(final String id, final String aggregateType) {
|
||||
this.id = id;
|
||||
this.type = aggregateType;
|
||||
}
|
||||
|
||||
|
||||
public abstract void when(final Event event);
|
||||
|
||||
public void load(final List<Event> events) {
|
||||
events.forEach(event -> {
|
||||
this.validateEvent(event);
|
||||
this.raiseEvent(event);
|
||||
this.version++;
|
||||
});
|
||||
}
|
||||
|
||||
public void apply(final Event event) {
|
||||
this.validateEvent(event);
|
||||
event.setAggregateType(this.type);
|
||||
|
||||
when(event);
|
||||
changes.add(event);
|
||||
|
||||
this.version++;
|
||||
event.setVersion(this.version);
|
||||
}
|
||||
|
||||
public void raiseEvent(final Event event) {
|
||||
this.validateEvent(event);
|
||||
|
||||
event.setAggregateType(this.type);
|
||||
when(event);
|
||||
|
||||
this.version++;
|
||||
}
|
||||
|
||||
public void clearChanges() {
|
||||
this.changes.clear();
|
||||
}
|
||||
|
||||
public void toSnapshot() {
|
||||
this.clearChanges();
|
||||
}
|
||||
|
||||
public String string() {
|
||||
return String.format("id: {%s}, type: {%s}, version: {%s}, changes: {%s}", id, type, version, changes.size());
|
||||
}
|
||||
|
||||
private void validateEvent(final Event event) {
|
||||
if (Objects.isNull(event) || !event.getAggregateId().equals(this.id))
|
||||
throw new InvalidEventException(event.toString());
|
||||
}
|
||||
|
||||
protected Event createEvent(String eventType, byte[] data, byte[] metadata) {
|
||||
return Event.builder()
|
||||
.aggregateId(this.getId())
|
||||
.version(this.getVersion())
|
||||
.aggregateType(this.getType())
|
||||
.eventType(eventType)
|
||||
.data(Objects.isNull(data) ? new byte[]{} : data)
|
||||
.metaData(Objects.isNull(metadata) ? new byte[]{} : metadata)
|
||||
.timeStamp(LocalDateTime.now())
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AggregateRoot{" +
|
||||
"id='" + id + '\'' +
|
||||
", type='" + type + '\'' +
|
||||
", version=" + version +
|
||||
", changes=" + changes.size() +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
18
src/main/java/com/eventsourcing/es/BaseEvent.java
Normal file
18
src/main/java/com/eventsourcing/es/BaseEvent.java
Normal file
@@ -0,0 +1,18 @@
|
||||
package com.eventsourcing.es;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public abstract class BaseEvent {
|
||||
protected String aggregateId;
|
||||
|
||||
public BaseEvent(String aggregateId) {
|
||||
Objects.requireNonNull(aggregateId);
|
||||
if (aggregateId.isBlank()) throw new RuntimeException("BaseEvent aggregateId is required");
|
||||
this.aggregateId = aggregateId;
|
||||
}
|
||||
}
|
||||
20
src/main/java/com/eventsourcing/es/Constants.java
Normal file
20
src/main/java/com/eventsourcing/es/Constants.java
Normal file
@@ -0,0 +1,20 @@
|
||||
package com.eventsourcing.es;
|
||||
|
||||
|
||||
public final class Constants {
|
||||
private Constants() {
|
||||
}
|
||||
|
||||
public static final String AGGREGATE_ID = "aggregate_id";
|
||||
public static final String SNAPSHOT_ID = "snapshot_id";
|
||||
public static final String AGGREGATE_TYPE = "aggregate_type";
|
||||
public static final String EVENT_TYPE = "event_type";
|
||||
public static final String EVENT_ID = "event_id";
|
||||
public static final String VERSION = "version";
|
||||
public static final String DATA = "data";
|
||||
public static final String METADATA = "metadata";
|
||||
public static final String TIMESTAMP = "timestamp";
|
||||
public static final String EVENTS = "events";
|
||||
|
||||
|
||||
}
|
||||
55
src/main/java/com/eventsourcing/es/Event.java
Normal file
55
src/main/java/com/eventsourcing/es/Event.java
Normal file
@@ -0,0 +1,55 @@
|
||||
package com.eventsourcing.es;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.UUID;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class Event {
|
||||
|
||||
public Event(String eventType, String aggregateType) {
|
||||
this.id = UUID.randomUUID();
|
||||
this.eventType = eventType;
|
||||
this.aggregateType = aggregateType;
|
||||
this.timeStamp = LocalDateTime.now();
|
||||
}
|
||||
|
||||
private UUID id;
|
||||
|
||||
private String aggregateId;
|
||||
|
||||
private String eventType;
|
||||
|
||||
private String aggregateType;
|
||||
|
||||
private long version;
|
||||
|
||||
private byte[] data;
|
||||
|
||||
private byte[] metaData;
|
||||
|
||||
// @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm")
|
||||
private LocalDateTime timeStamp;
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Event{" +
|
||||
"id=" + id +
|
||||
", aggregateId='" + aggregateId + '\'' +
|
||||
", eventType='" + eventType + '\'' +
|
||||
", aggregateType='" + aggregateType + '\'' +
|
||||
", version=" + version + '\'' +
|
||||
", timeStamp=" + timeStamp + '\'' +
|
||||
", data=" + new String(data) + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
7
src/main/java/com/eventsourcing/es/EventBus.java
Normal file
7
src/main/java/com/eventsourcing/es/EventBus.java
Normal file
@@ -0,0 +1,7 @@
|
||||
package com.eventsourcing.es;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface EventBus {
|
||||
void publish(List<Event> events);
|
||||
}
|
||||
32
src/main/java/com/eventsourcing/es/EventSourcingUtils.java
Normal file
32
src/main/java/com/eventsourcing/es/EventSourcingUtils.java
Normal file
@@ -0,0 +1,32 @@
|
||||
package com.eventsourcing.es;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.UUID;
|
||||
|
||||
import static com.eventsourcing.es.Constants.EVENTS;
|
||||
|
||||
public class EventSourcingUtils {
|
||||
private EventSourcingUtils() {
|
||||
}
|
||||
|
||||
public static String getAggregateTypeTopic(final String aggregateType) {
|
||||
return String.format("%s_%s", aggregateType, EVENTS);
|
||||
}
|
||||
|
||||
public static <T extends AggregateRoot> Snapshot snapshotFromAggregate(final T aggregate) {
|
||||
byte[] bytes = SerializerUtils.serializeToJsonBytes(aggregate);
|
||||
return Snapshot.builder()
|
||||
.id(UUID.randomUUID())
|
||||
.aggregateId(aggregate.getId())
|
||||
.aggregateType(aggregate.getType())
|
||||
.version(aggregate.getVersion())
|
||||
.data(bytes)
|
||||
.timeStamp(LocalDateTime.now())
|
||||
.build();
|
||||
}
|
||||
|
||||
public static <T extends AggregateRoot> T aggregateFromSnapshot(final Snapshot snapshot, final Class<T> valueType) {
|
||||
return SerializerUtils.deserializeFromJsonBytes(snapshot.getData(), valueType);
|
||||
}
|
||||
|
||||
}
|
||||
152
src/main/java/com/eventsourcing/es/EventStore.java
Normal file
152
src/main/java/com/eventsourcing/es/EventStore.java
Normal file
@@ -0,0 +1,152 @@
|
||||
package com.eventsourcing.es;
|
||||
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.*;
|
||||
|
||||
@Repository
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class EventStore implements EventStoreDB {
|
||||
|
||||
// private final JdbcTemplate jdbcTemplate;
|
||||
private final NamedParameterJdbcTemplate jdbcTemplate;
|
||||
|
||||
@Override
|
||||
public void saveEvents(List<Event> events) {
|
||||
if (events.isEmpty()) return;
|
||||
|
||||
final List<Event> changes = new ArrayList<>(events);
|
||||
changes.forEach(event -> {
|
||||
int result = jdbcTemplate.update("INSERT INTO events (aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp) " +
|
||||
"values (:aggregate_id, :aggregate_type, :event_type, :data, :metadata, :version, now())",
|
||||
Map.of("aggregate_id", event.getAggregateId(),
|
||||
"aggregate_type", event.getAggregateType(),
|
||||
"event_type", event.getEventType(),
|
||||
"data", Objects.isNull(event.getData()) ? new byte[]{} : event.getData(),
|
||||
"metadata", Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(),
|
||||
"version", event.getVersion()));
|
||||
|
||||
log.info("(saveEvents) saved event: {}", event);
|
||||
});
|
||||
}
|
||||
|
||||
private Snapshot getSnapshot(String aggregateId) {
|
||||
|
||||
return new Snapshot();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Event> loadEvents(String aggregateId, long version) {
|
||||
final List<Event> events = jdbcTemplate.query("select event_id ,aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp" +
|
||||
" from events e where e.aggregate_id = :aggregate_id and e.version > :version ORDER BY e.version ASC",
|
||||
Map.of("aggregate_id", aggregateId, "version", version),
|
||||
(rs, rowNum) -> {
|
||||
Event.builder()
|
||||
.aggregateId(rs.getString("aggregate_id"))
|
||||
.aggregateType(rs.getString("aggregate_type"))
|
||||
.eventType(rs.getString("event_type"))
|
||||
.data(rs.getBytes("data"))
|
||||
.metaData(rs.getBytes("metadata"))
|
||||
.version(rs.getLong("version"))
|
||||
.timeStamp(rs.getTimestamp("timestamp").toLocalDateTime())
|
||||
.build();
|
||||
return null;
|
||||
});
|
||||
|
||||
log.info("(loadEvents) events list: {}", events);
|
||||
return events;
|
||||
}
|
||||
|
||||
private <T extends AggregateRoot> void saveSnapshot(T aggregate) {
|
||||
aggregate.toSnapshot();
|
||||
final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate);
|
||||
|
||||
int update = jdbcTemplate.update("INSERT INTO snapshots (aggregate_id, aggregate_type, data, metadata, version, timestamp) " +
|
||||
"VALUES (:aggregate_id, :aggregate_type, :data, :metadata, :version, now()) " +
|
||||
"ON CONFLICT (aggregate_id) " +
|
||||
"DO UPDATE SET data = :data, version = :version, timestamp = now()",
|
||||
Map.of("aggregate_id", snapshot.getAggregateId(),
|
||||
"aggregate_type", snapshot.getAggregateType(),
|
||||
"data", Objects.isNull(snapshot.getData()) ? new byte[]{} : snapshot.getData(),
|
||||
"metadata", Objects.isNull(snapshot.getMetaData()) ? new byte[]{} : snapshot.getMetaData(),
|
||||
"version", snapshot.getVersion()));
|
||||
log.info("(saveSnapshot) result: {}", update);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public <T extends AggregateRoot> void save(T aggregate) {
|
||||
this.handleConcurrency(aggregate.getId());
|
||||
this.saveEvents(aggregate.getChanges());
|
||||
if (aggregate.getVersion() % 3 == 0) {
|
||||
this.saveSnapshot(aggregate);
|
||||
}
|
||||
log.info("(save) aggregate saved: {}", aggregate);
|
||||
}
|
||||
|
||||
private void handleConcurrency(String aggregateId) {
|
||||
int update = jdbcTemplate.update("SELECT aggregate_id FROM events e WHERE e.aggregate_id = ? LIMIT 1 FOR UPDATE", Map.of("aggregate_id", aggregateId));
|
||||
log.info("(handleConcurrency) result: {}", update);
|
||||
}
|
||||
|
||||
private Optional<Snapshot> loadSnapshot(String aggregateId) {
|
||||
final Optional<Snapshot> snapshot = jdbcTemplate.query("select aggregate_id, aggregate_type, data, metadata, version, timestamp from snapshots s " +
|
||||
"where s.aggregate_id = :aggregate_id",
|
||||
Map.of("aggregate_id", aggregateId), (rs, rowNum) -> Snapshot.builder()
|
||||
.aggregateId(rs.getString("aggregate_id"))
|
||||
.aggregateType(rs.getString("aggregate_type"))
|
||||
.data(rs.getBytes("data"))
|
||||
.metaData(rs.getBytes("metadata"))
|
||||
.version(rs.getLong("version"))
|
||||
.timeStamp(rs.getTimestamp("timestamp").toLocalDateTime())
|
||||
.build()).stream().findFirst();
|
||||
snapshot.ifPresent(result -> log.info("(loadSnapshot) snapshot: {}", result));
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
private <T extends AggregateRoot> T getAggregate(final String aggregateId, final Class<T> aggregateType) {
|
||||
try {
|
||||
return aggregateType.getConstructor(String.class).newInstance(aggregateId);
|
||||
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private <T extends AggregateRoot> T getSnapshotFromClass(Optional<Snapshot> snapshot, String aggregateId, Class<T> aggregateType) {
|
||||
if (snapshot.isEmpty()) {
|
||||
final var defaultSnapshot = EventSourcingUtils.snapshotFromAggregate(getAggregate(aggregateId, aggregateType));
|
||||
return EventSourcingUtils.aggregateFromSnapshot(defaultSnapshot, aggregateType);
|
||||
}
|
||||
return EventSourcingUtils.aggregateFromSnapshot(snapshot.get(), aggregateType);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(readOnly = true)
|
||||
public <T extends AggregateRoot> T load(String aggregateId, Class<T> aggregateType) {
|
||||
|
||||
final Optional<Snapshot> snapshot = this.loadSnapshot(aggregateId);
|
||||
|
||||
final var aggregate = this.getSnapshotFromClass(snapshot, aggregateId, aggregateType);
|
||||
log.info("(load) aggregate: {}", aggregate);
|
||||
|
||||
final List<Event> events = this.loadEvents(aggregateId, aggregate.getVersion());
|
||||
events.forEach(event -> {
|
||||
aggregate.raiseEvent(event);
|
||||
log.info("raise event: {}", event.getVersion());
|
||||
});
|
||||
|
||||
return aggregate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean exists(String aggregateId) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
16
src/main/java/com/eventsourcing/es/EventStoreDB.java
Normal file
16
src/main/java/com/eventsourcing/es/EventStoreDB.java
Normal file
@@ -0,0 +1,16 @@
|
||||
package com.eventsourcing.es;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface EventStoreDB {
|
||||
|
||||
void saveEvents(final List<Event> events);
|
||||
|
||||
List<Event> loadEvents(final String aggregateId, long version);
|
||||
|
||||
<T extends AggregateRoot> void save(final T aggregate);
|
||||
|
||||
<T extends AggregateRoot> T load(final String aggregateId, final Class<T> aggregateType);
|
||||
|
||||
Boolean exists(final String aggregateId);
|
||||
}
|
||||
5
src/main/java/com/eventsourcing/es/Projection.java
Normal file
5
src/main/java/com/eventsourcing/es/Projection.java
Normal file
@@ -0,0 +1,5 @@
|
||||
package com.eventsourcing.es;
|
||||
|
||||
public interface Projection {
|
||||
void when(Event event);
|
||||
}
|
||||
68
src/main/java/com/eventsourcing/es/SerializerUtils.java
Normal file
68
src/main/java/com/eventsourcing/es/SerializerUtils.java
Normal file
@@ -0,0 +1,68 @@
|
||||
package com.eventsourcing.es;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper;
|
||||
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
|
||||
public final class SerializerUtils {
|
||||
|
||||
private static final ObjectMapper objectMapper = JsonMapper.builder()
|
||||
.addModule(new ParameterNamesModule())
|
||||
.addModule(new Jdk8Module())
|
||||
.addModule(new JavaTimeModule())
|
||||
.build();
|
||||
|
||||
private SerializerUtils() {
|
||||
}
|
||||
|
||||
public static byte[] serializeToJsonBytes(final Object object) {
|
||||
try {
|
||||
return objectMapper.writeValueAsBytes(object);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public static <T> T deserializeFromJsonBytes(final byte[] jsonBytes, final Class<T> valueType) {
|
||||
try {
|
||||
return objectMapper.readValue(jsonBytes, valueType);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public static Event[] deserializeEventsFromJsonBytes(final byte[] jsonBytes) {
|
||||
try {
|
||||
return objectMapper.readValue(jsonBytes, Event[].class);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public static HashMap<String, byte[]> deserializeEventsMetadata(final byte[] metaData) {
|
||||
final var tr = new TypeReference<HashMap<String, byte[]>>() {
|
||||
};
|
||||
try {
|
||||
return objectMapper.readValue(metaData, tr);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public static byte[] serializeEventsMetadata(final HashMap<String, byte[]> metaData) {
|
||||
try {
|
||||
final var valueAsString = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metaData);
|
||||
return valueAsString.getBytes(StandardCharsets.UTF_8);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
45
src/main/java/com/eventsourcing/es/Snapshot.java
Normal file
45
src/main/java/com/eventsourcing/es/Snapshot.java
Normal file
@@ -0,0 +1,45 @@
|
||||
package com.eventsourcing.es;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.UUID;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class Snapshot {
|
||||
|
||||
private UUID id;
|
||||
|
||||
private String aggregateId;
|
||||
|
||||
|
||||
private String aggregateType;
|
||||
|
||||
private byte[] data;
|
||||
|
||||
private byte[] metaData;
|
||||
|
||||
|
||||
private long version;
|
||||
private LocalDateTime timeStamp;
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Snapshot{" +
|
||||
"id=" + id +
|
||||
", aggregateId='" + aggregateId + '\'' +
|
||||
", aggregateType='" + aggregateType + '\'' +
|
||||
", data=" + data.length + " bytes" +
|
||||
", metaData=" + metaData.length + " bytes" +
|
||||
", version=" + version +
|
||||
", timeStamp=" + timeStamp +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package com.eventsourcing.es.exceptions;
|
||||
|
||||
public class InvalidEventException extends RuntimeException{
|
||||
public InvalidEventException() {
|
||||
}
|
||||
|
||||
public InvalidEventException(String message) {
|
||||
super("invalid event: " + message);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package com.eventsourcing.es.exceptions;
|
||||
|
||||
public class InvalidEventTypeException extends RuntimeException {
|
||||
public InvalidEventTypeException() {
|
||||
}
|
||||
|
||||
public InvalidEventTypeException(String eventType) {
|
||||
super("invalid event type: " + eventType);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.eventsourcing.exceptions;
|
||||
|
||||
public class InternalServerErrorException extends RuntimeException{
|
||||
public InternalServerErrorException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public InternalServerErrorException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.eventsourcing.exceptions;
|
||||
|
||||
public record InternalServerErrorResponse(int status, String message, String timestamp) {
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
package com.eventsourcing.exceptions;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
public class NotFoundExceptionResponse {
|
||||
private String message;
|
||||
private String timestamp;
|
||||
private int status;
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
package com.eventsourcing.filters;
|
||||
|
||||
import com.eventsourcing.exceptions.InternalServerErrorResponse;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.MethodArgumentNotValidException;
|
||||
import org.springframework.web.bind.annotation.ControllerAdvice;
|
||||
import org.springframework.web.bind.annotation.ExceptionHandler;
|
||||
import org.springframework.web.bind.annotation.ResponseStatus;
|
||||
import org.springframework.web.context.request.WebRequest;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@ControllerAdvice
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
|
||||
@Order(2)
|
||||
public class GlobalControllerAdvice {
|
||||
|
||||
|
||||
@ExceptionHandler(RuntimeException.class)
|
||||
public ResponseEntity<InternalServerErrorResponse> handleRuntimeException(RuntimeException ex, WebRequest request) {
|
||||
final var response = new InternalServerErrorResponse(HttpStatus.INTERNAL_SERVER_ERROR.value(), ex.getMessage(), LocalDateTime.now().toString());
|
||||
log.error("OrderNotFoundException response: {} ", response);
|
||||
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
|
||||
}
|
||||
|
||||
|
||||
@ResponseStatus(HttpStatus.BAD_REQUEST)
|
||||
@ExceptionHandler(MethodArgumentNotValidException.class)
|
||||
public ResponseEntity<Map<String, String>> handleInvalidArgument(MethodArgumentNotValidException ex) {
|
||||
final Map<String, String> errorMap = new HashMap<>();
|
||||
ex.getBindingResult().getFieldErrors().forEach(error -> errorMap.put(error.getField(), error.getDefaultMessage()));
|
||||
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(errorMap);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package com.eventsourcing.mappers;
|
||||
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
|
||||
import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO;
|
||||
|
||||
public final class BankAccountMapper {
|
||||
|
||||
private BankAccountMapper() {
|
||||
}
|
||||
|
||||
|
||||
public static BankAccountResponseDTO bankAccountResponseDTOFromAggregate(BankAccountAggregate bankAccountAggregate) {
|
||||
return new BankAccountResponseDTO(
|
||||
bankAccountAggregate.getId(),
|
||||
bankAccountAggregate.getEmail(),
|
||||
bankAccountAggregate.getAddress(),
|
||||
bankAccountAggregate.getUserName(),
|
||||
bankAccountAggregate.getBalance()
|
||||
);
|
||||
}
|
||||
|
||||
// public static BankAccountResponseDTO bankAccountResponseDTOFromDocument(BankAccountDocument bankAccountDocument) {
|
||||
// return new BankAccountResponseDTO(
|
||||
// bankAccountDocument.getAggregateId(),
|
||||
// bankAccountDocument.getEmail(),
|
||||
// bankAccountDocument.getAddress(),
|
||||
// bankAccountDocument.getUserName(),
|
||||
// bankAccountDocument.getBalance()
|
||||
// );
|
||||
// }
|
||||
//
|
||||
// public static BankAccountDocument bankAccountDocumentFromAggregate(BankAccountAggregate bankAccountAggregate) {
|
||||
// return BankAccountDocument.builder()
|
||||
// .aggregateId(bankAccountAggregate.getId())
|
||||
// .email(bankAccountAggregate.getEmail())
|
||||
// .address(bankAccountAggregate.getAddress())
|
||||
// .userName(bankAccountAggregate.getUserName())
|
||||
// .balance(bankAccountAggregate.getBalance())
|
||||
// .build();
|
||||
// }
|
||||
}
|
||||
Reference in New Issue
Block a user