4 Commits

Author SHA1 Message Date
Alexander
e7f1a901d9 feature: add bank account domain improvements 2022-04-18 10:28:02 +03:00
Alexander
39430d8426 feature: add event store improvements 2022-04-18 10:28:02 +03:00
Alexander
db49715fc0 feature: add event store improvements update 2022-04-18 10:28:02 +03:00
Alexander
98611cf3e8 feature: add event store improvements 2022-04-18 10:28:02 +03:00
7 changed files with 108 additions and 67 deletions

View File

@@ -0,0 +1,10 @@
package com.eventsourcing.bankAccount.exceptions;
public class BankAccountDocumentNotFoundException extends RuntimeException {
public BankAccountDocumentNotFoundException() {
}
public BankAccountDocumentNotFoundException(String id) {
super("bank account document not found id:" + id);
}
}

View File

@@ -7,6 +7,7 @@ import com.eventsourcing.bankAccount.events.AddressUpdatedEvent;
import com.eventsourcing.bankAccount.events.BalanceDepositedEvent;
import com.eventsourcing.bankAccount.events.BankAccountCreatedEvent;
import com.eventsourcing.bankAccount.events.EmailChangedEvent;
import com.eventsourcing.bankAccount.exceptions.BankAccountDocumentNotFoundException;
import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository;
import com.eventsourcing.es.Event;
import com.eventsourcing.es.EventStoreDB;
@@ -114,7 +115,7 @@ public class BankAccountMongoProjection implements Projection {
log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId());
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
if (documentOptional.isEmpty())
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
throw new BankAccountDocumentNotFoundException(event.getAggregateId());
final var document = documentOptional.get();
document.setEmail(event.getNewEmail());
@@ -126,7 +127,7 @@ public class BankAccountMongoProjection implements Projection {
log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
if (documentOptional.isEmpty())
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
throw new BankAccountDocumentNotFoundException(event.getAggregateId());
final var document = documentOptional.get();
document.setAddress(event.getNewAddress());
@@ -138,11 +139,10 @@ public class BankAccountMongoProjection implements Projection {
log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId());
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
if (documentOptional.isEmpty())
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
throw new BankAccountDocumentNotFoundException(event.getAggregateId());
final var document = documentOptional.get();
final var balance = document.getBalance();
final var newBalance = balance.add(event.getAmount());
final var newBalance = document.getBalance().add(event.getAmount());
document.setBalance(newBalance);
mongoRepository.save(document);
}

View File

@@ -11,6 +11,7 @@ import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import java.io.Serializable;
import java.util.*;
import static com.eventsourcing.es.Constants.*;
@@ -31,23 +32,81 @@ public class EventStore implements EventStoreDB {
private final NamedParameterJdbcTemplate jdbcTemplate;
private final EventBus eventBus;
@Override
@Transactional
@NewSpan
public <T extends AggregateRoot> void save(@SpanTag("aggregate") T aggregate) {
final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());
if (aggregate.getVersion() > 1) {
this.handleConcurrency(aggregate.getId());
}
this.saveEvents(aggregate.getChanges());
if (aggregate.getVersion() % SNAPSHOT_FREQUENCY == 0) {
this.saveSnapshot(aggregate);
}
eventBus.publish(aggregateEvents);
log.info("(save) saved aggregate: {}", aggregate);
}
@Override
@Transactional(readOnly = true)
@NewSpan
public <T extends AggregateRoot> T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
final Optional<Snapshot> snapshot = this.loadSnapshot(aggregateId);
final var aggregate = this.getSnapshotFromClass(snapshot, aggregateId, aggregateType);
final List<Event> events = this.loadEvents(aggregateId, aggregate.getVersion());
events.forEach(event -> {
aggregate.raiseEvent(event);
log.info("raise event version: {}", event.getVersion());
});
if (aggregate.getVersion() == 0) throw new AggregateNotFoundException(aggregateId);
log.info("(load) loaded aggregate: {}", aggregate);
return aggregate;
}
@Override
@NewSpan
public void saveEvents(@SpanTag("events") List<Event> events) {
if (events.isEmpty()) return;
final List<Event> changes = new ArrayList<>(events);
changes.forEach(event -> {
int result = jdbcTemplate.update(SAVE_EVENTS_QUERY, Map.of(
AGGREGATE_ID, event.getAggregateId(),
AGGREGATE_TYPE, event.getAggregateType(),
EVENT_TYPE, event.getEventType(),
DATA, Objects.isNull(event.getData()) ? new byte[]{} : event.getData(),
METADATA, Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(),
VERSION, event.getVersion()));
if (changes.size() > 1) {
this.eventsBatchInsert(changes);
return;
}
log.info("(saveEvents) saved result: {}, event: {}", result, event);
});
final Event event = changes.get(0);
int result = jdbcTemplate.update(SAVE_EVENTS_QUERY, mapFromEvent(event));
log.info("(saveEvents) saved result: {}, event: {}", result, event);
}
private Map<String, Serializable> mapFromEvent(Event event) {
return Map.of(
AGGREGATE_ID, event.getAggregateId(),
AGGREGATE_TYPE, event.getAggregateType(),
EVENT_TYPE, event.getEventType(),
DATA, Objects.isNull(event.getData()) ? new byte[]{} : event.getData(),
METADATA, Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(),
VERSION, event.getVersion());
}
@NewSpan
private void eventsBatchInsert(@SpanTag("events") List<Event> events) {
final var args = events.stream().map(this::mapFromEvent).toList();
final Map<String, ?>[] maps = args.toArray(new Map[0]);
int[] ints = jdbcTemplate.batchUpdate(SAVE_EVENTS_QUERY, maps);
log.info("(saveEvents) BATCH saved result: {}, event: {}", ints);
}
@Override
@@ -80,25 +139,6 @@ public class EventStore implements EventStoreDB {
log.info("(saveSnapshot) updateResult: {}", updateResult);
}
@Override
@Transactional
@NewSpan
public <T extends AggregateRoot> void save(@SpanTag("aggregate") T aggregate) {
final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());
if (aggregate.getVersion() > 1) {
this.handleConcurrency(aggregate.getId());
}
this.saveEvents(aggregate.getChanges());
if (aggregate.getVersion() % SNAPSHOT_FREQUENCY == 0) {
this.saveSnapshot(aggregate);
}
eventBus.publish(aggregateEvents);
log.info("(save) saved aggregate: {}", aggregate);
}
@NewSpan
private void handleConcurrency(@SpanTag("aggregateId") String aggregateId) {
@@ -141,26 +181,6 @@ public class EventStore implements EventStoreDB {
return EventSourcingUtils.aggregateFromSnapshot(snapshot.get(), aggregateType);
}
@Override
@Transactional(readOnly = true)
@NewSpan
public <T extends AggregateRoot> T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
final Optional<Snapshot> snapshot = this.loadSnapshot(aggregateId);
final var aggregate = this.getSnapshotFromClass(snapshot, aggregateId, aggregateType);
final List<Event> events = this.loadEvents(aggregateId, aggregate.getVersion());
events.forEach(event -> {
aggregate.raiseEvent(event);
log.info("raise event version: {}", event.getVersion());
});
if (aggregate.getVersion() == 0) throw new AggregateNotFoundException(aggregateId);
log.info("(load) loaded aggregate: {}", aggregate);
return aggregate;
}
@Override
@NewSpan
@@ -171,7 +191,7 @@ public class EventStore implements EventStoreDB {
return true;
} catch (Exception ex) {
if (!(ex instanceof EmptyResultDataAccessException)) {
throw new RuntimeException("exists", ex);
throw new RuntimeException(ex);
}
return false;
}

View File

@@ -35,7 +35,7 @@ public class KafkaEventBus implements EventBus {
log.info("publishing kafka record value >>>>> {}", new String(record.value()));
} catch (Exception ex) {
log.error("(KafkaEventBus) publish get", ex);
log.error("(KafkaEventBus) publish get timeout", ex);
throw new RuntimeException(ex);
}
}

View File

@@ -0,0 +1,6 @@
package com.eventsourcing.exceptions;
import java.time.LocalDateTime;
public record ExceptionResponseDTO(int Status, String message, LocalDateTime timestamp) {
}

View File

@@ -1,6 +0,0 @@
package com.eventsourcing.exceptions;
import java.time.LocalDateTime;
public record NotFoundResponseDTO(int Status, String message, LocalDateTime timestamp) {
}

View File

@@ -1,8 +1,11 @@
package com.eventsourcing.filters;
import com.eventsourcing.bankAccount.exceptions.BankAccountDocumentNotFoundException;
import com.eventsourcing.bankAccount.exceptions.InvalidAddressException;
import com.eventsourcing.bankAccount.exceptions.InvalidEmailException;
import com.eventsourcing.es.exceptions.AggregateNotFoundException;
import com.eventsourcing.exceptions.InternalServerErrorResponse;
import com.eventsourcing.exceptions.NotFoundResponseDTO;
import com.eventsourcing.exceptions.ExceptionResponseDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
@@ -43,10 +46,18 @@ public class GlobalControllerAdvice {
}
@ResponseStatus(HttpStatus.NOT_FOUND)
@ExceptionHandler(AggregateNotFoundException.class)
public ResponseEntity<NotFoundResponseDTO> handleAggregateNotFoundException(AggregateNotFoundException ex) {
final var notFoundResponseDTO = new NotFoundResponseDTO(HttpStatus.NOT_FOUND.value(), ex.getMessage(), LocalDateTime.now());
log.error("AggregateNotFoundException response ex:", ex);
@ExceptionHandler({AggregateNotFoundException.class, BankAccountDocumentNotFoundException.class})
public ResponseEntity<ExceptionResponseDTO> handleAggregateNotFoundExceptions(AggregateNotFoundException ex) {
final var notFoundResponseDTO = new ExceptionResponseDTO(HttpStatus.NOT_FOUND.value(), ex.getMessage(), LocalDateTime.now());
log.error("handleAggregateNotFoundExceptions response ex:", ex);
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(notFoundResponseDTO);
}
@ResponseStatus(HttpStatus.BAD_REQUEST)
@ExceptionHandler({InvalidAddressException.class, InvalidEmailException.class})
public ResponseEntity<ExceptionResponseDTO> handleInvalidAggregateExceptions(AggregateNotFoundException ex) {
final var notFoundResponseDTO = new ExceptionResponseDTO(HttpStatus.BAD_REQUEST.value(), ex.getMessage(), LocalDateTime.now());
log.error("handleInvalidAggregateExceptions response ex:", ex);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(notFoundResponseDTO);
}
}