Compare commits
4 Commits
feature/im
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e7f1a901d9 | ||
|
|
39430d8426 | ||
|
|
db49715fc0 | ||
|
|
98611cf3e8 |
@@ -0,0 +1,10 @@
|
||||
package com.eventsourcing.bankAccount.exceptions;
|
||||
|
||||
public class BankAccountDocumentNotFoundException extends RuntimeException {
|
||||
public BankAccountDocumentNotFoundException() {
|
||||
}
|
||||
|
||||
public BankAccountDocumentNotFoundException(String id) {
|
||||
super("bank account document not found id:" + id);
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import com.eventsourcing.bankAccount.events.AddressUpdatedEvent;
|
||||
import com.eventsourcing.bankAccount.events.BalanceDepositedEvent;
|
||||
import com.eventsourcing.bankAccount.events.BankAccountCreatedEvent;
|
||||
import com.eventsourcing.bankAccount.events.EmailChangedEvent;
|
||||
import com.eventsourcing.bankAccount.exceptions.BankAccountDocumentNotFoundException;
|
||||
import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository;
|
||||
import com.eventsourcing.es.Event;
|
||||
import com.eventsourcing.es.EventStoreDB;
|
||||
@@ -114,7 +115,7 @@ public class BankAccountMongoProjection implements Projection {
|
||||
log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId());
|
||||
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
|
||||
if (documentOptional.isEmpty())
|
||||
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
|
||||
throw new BankAccountDocumentNotFoundException(event.getAggregateId());
|
||||
|
||||
final var document = documentOptional.get();
|
||||
document.setEmail(event.getNewEmail());
|
||||
@@ -126,7 +127,7 @@ public class BankAccountMongoProjection implements Projection {
|
||||
log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
|
||||
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
|
||||
if (documentOptional.isEmpty())
|
||||
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
|
||||
throw new BankAccountDocumentNotFoundException(event.getAggregateId());
|
||||
|
||||
final var document = documentOptional.get();
|
||||
document.setAddress(event.getNewAddress());
|
||||
@@ -138,11 +139,10 @@ public class BankAccountMongoProjection implements Projection {
|
||||
log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId());
|
||||
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
|
||||
if (documentOptional.isEmpty())
|
||||
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
|
||||
throw new BankAccountDocumentNotFoundException(event.getAggregateId());
|
||||
|
||||
final var document = documentOptional.get();
|
||||
final var balance = document.getBalance();
|
||||
final var newBalance = balance.add(event.getAmount());
|
||||
final var newBalance = document.getBalance().add(event.getAmount());
|
||||
document.setBalance(newBalance);
|
||||
mongoRepository.save(document);
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
|
||||
import static com.eventsourcing.es.Constants.*;
|
||||
@@ -31,23 +32,81 @@ public class EventStore implements EventStoreDB {
|
||||
private final NamedParameterJdbcTemplate jdbcTemplate;
|
||||
private final EventBus eventBus;
|
||||
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
@NewSpan
|
||||
public <T extends AggregateRoot> void save(@SpanTag("aggregate") T aggregate) {
|
||||
final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());
|
||||
|
||||
if (aggregate.getVersion() > 1) {
|
||||
this.handleConcurrency(aggregate.getId());
|
||||
}
|
||||
|
||||
this.saveEvents(aggregate.getChanges());
|
||||
if (aggregate.getVersion() % SNAPSHOT_FREQUENCY == 0) {
|
||||
this.saveSnapshot(aggregate);
|
||||
}
|
||||
|
||||
eventBus.publish(aggregateEvents);
|
||||
|
||||
log.info("(save) saved aggregate: {}", aggregate);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(readOnly = true)
|
||||
@NewSpan
|
||||
public <T extends AggregateRoot> T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
|
||||
|
||||
final Optional<Snapshot> snapshot = this.loadSnapshot(aggregateId);
|
||||
|
||||
final var aggregate = this.getSnapshotFromClass(snapshot, aggregateId, aggregateType);
|
||||
|
||||
final List<Event> events = this.loadEvents(aggregateId, aggregate.getVersion());
|
||||
events.forEach(event -> {
|
||||
aggregate.raiseEvent(event);
|
||||
log.info("raise event version: {}", event.getVersion());
|
||||
});
|
||||
|
||||
if (aggregate.getVersion() == 0) throw new AggregateNotFoundException(aggregateId);
|
||||
|
||||
log.info("(load) loaded aggregate: {}", aggregate);
|
||||
return aggregate;
|
||||
}
|
||||
|
||||
@Override
|
||||
@NewSpan
|
||||
public void saveEvents(@SpanTag("events") List<Event> events) {
|
||||
if (events.isEmpty()) return;
|
||||
|
||||
final List<Event> changes = new ArrayList<>(events);
|
||||
changes.forEach(event -> {
|
||||
int result = jdbcTemplate.update(SAVE_EVENTS_QUERY, Map.of(
|
||||
AGGREGATE_ID, event.getAggregateId(),
|
||||
AGGREGATE_TYPE, event.getAggregateType(),
|
||||
EVENT_TYPE, event.getEventType(),
|
||||
DATA, Objects.isNull(event.getData()) ? new byte[]{} : event.getData(),
|
||||
METADATA, Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(),
|
||||
VERSION, event.getVersion()));
|
||||
if (changes.size() > 1) {
|
||||
this.eventsBatchInsert(changes);
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("(saveEvents) saved result: {}, event: {}", result, event);
|
||||
});
|
||||
final Event event = changes.get(0);
|
||||
int result = jdbcTemplate.update(SAVE_EVENTS_QUERY, mapFromEvent(event));
|
||||
log.info("(saveEvents) saved result: {}, event: {}", result, event);
|
||||
}
|
||||
|
||||
private Map<String, Serializable> mapFromEvent(Event event) {
|
||||
return Map.of(
|
||||
AGGREGATE_ID, event.getAggregateId(),
|
||||
AGGREGATE_TYPE, event.getAggregateType(),
|
||||
EVENT_TYPE, event.getEventType(),
|
||||
DATA, Objects.isNull(event.getData()) ? new byte[]{} : event.getData(),
|
||||
METADATA, Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(),
|
||||
VERSION, event.getVersion());
|
||||
}
|
||||
|
||||
|
||||
@NewSpan
|
||||
private void eventsBatchInsert(@SpanTag("events") List<Event> events) {
|
||||
final var args = events.stream().map(this::mapFromEvent).toList();
|
||||
final Map<String, ?>[] maps = args.toArray(new Map[0]);
|
||||
int[] ints = jdbcTemplate.batchUpdate(SAVE_EVENTS_QUERY, maps);
|
||||
log.info("(saveEvents) BATCH saved result: {}, event: {}", ints);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -80,25 +139,6 @@ public class EventStore implements EventStoreDB {
|
||||
log.info("(saveSnapshot) updateResult: {}", updateResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
@NewSpan
|
||||
public <T extends AggregateRoot> void save(@SpanTag("aggregate") T aggregate) {
|
||||
final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());
|
||||
|
||||
if (aggregate.getVersion() > 1) {
|
||||
this.handleConcurrency(aggregate.getId());
|
||||
}
|
||||
|
||||
this.saveEvents(aggregate.getChanges());
|
||||
if (aggregate.getVersion() % SNAPSHOT_FREQUENCY == 0) {
|
||||
this.saveSnapshot(aggregate);
|
||||
}
|
||||
|
||||
eventBus.publish(aggregateEvents);
|
||||
|
||||
log.info("(save) saved aggregate: {}", aggregate);
|
||||
}
|
||||
|
||||
@NewSpan
|
||||
private void handleConcurrency(@SpanTag("aggregateId") String aggregateId) {
|
||||
@@ -141,26 +181,6 @@ public class EventStore implements EventStoreDB {
|
||||
return EventSourcingUtils.aggregateFromSnapshot(snapshot.get(), aggregateType);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(readOnly = true)
|
||||
@NewSpan
|
||||
public <T extends AggregateRoot> T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
|
||||
|
||||
final Optional<Snapshot> snapshot = this.loadSnapshot(aggregateId);
|
||||
|
||||
final var aggregate = this.getSnapshotFromClass(snapshot, aggregateId, aggregateType);
|
||||
|
||||
final List<Event> events = this.loadEvents(aggregateId, aggregate.getVersion());
|
||||
events.forEach(event -> {
|
||||
aggregate.raiseEvent(event);
|
||||
log.info("raise event version: {}", event.getVersion());
|
||||
});
|
||||
|
||||
if (aggregate.getVersion() == 0) throw new AggregateNotFoundException(aggregateId);
|
||||
|
||||
log.info("(load) loaded aggregate: {}", aggregate);
|
||||
return aggregate;
|
||||
}
|
||||
|
||||
@Override
|
||||
@NewSpan
|
||||
@@ -171,7 +191,7 @@ public class EventStore implements EventStoreDB {
|
||||
return true;
|
||||
} catch (Exception ex) {
|
||||
if (!(ex instanceof EmptyResultDataAccessException)) {
|
||||
throw new RuntimeException("exists", ex);
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ public class KafkaEventBus implements EventBus {
|
||||
log.info("publishing kafka record value >>>>> {}", new String(record.value()));
|
||||
|
||||
} catch (Exception ex) {
|
||||
log.error("(KafkaEventBus) publish get", ex);
|
||||
log.error("(KafkaEventBus) publish get timeout", ex);
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
package com.eventsourcing.exceptions;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
public record ExceptionResponseDTO(int Status, String message, LocalDateTime timestamp) {
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
package com.eventsourcing.exceptions;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
public record NotFoundResponseDTO(int Status, String message, LocalDateTime timestamp) {
|
||||
}
|
||||
@@ -1,8 +1,11 @@
|
||||
package com.eventsourcing.filters;
|
||||
|
||||
import com.eventsourcing.bankAccount.exceptions.BankAccountDocumentNotFoundException;
|
||||
import com.eventsourcing.bankAccount.exceptions.InvalidAddressException;
|
||||
import com.eventsourcing.bankAccount.exceptions.InvalidEmailException;
|
||||
import com.eventsourcing.es.exceptions.AggregateNotFoundException;
|
||||
import com.eventsourcing.exceptions.InternalServerErrorResponse;
|
||||
import com.eventsourcing.exceptions.NotFoundResponseDTO;
|
||||
import com.eventsourcing.exceptions.ExceptionResponseDTO;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.core.annotation.Order;
|
||||
@@ -43,10 +46,18 @@ public class GlobalControllerAdvice {
|
||||
}
|
||||
|
||||
@ResponseStatus(HttpStatus.NOT_FOUND)
|
||||
@ExceptionHandler(AggregateNotFoundException.class)
|
||||
public ResponseEntity<NotFoundResponseDTO> handleAggregateNotFoundException(AggregateNotFoundException ex) {
|
||||
final var notFoundResponseDTO = new NotFoundResponseDTO(HttpStatus.NOT_FOUND.value(), ex.getMessage(), LocalDateTime.now());
|
||||
log.error("AggregateNotFoundException response ex:", ex);
|
||||
@ExceptionHandler({AggregateNotFoundException.class, BankAccountDocumentNotFoundException.class})
|
||||
public ResponseEntity<ExceptionResponseDTO> handleAggregateNotFoundExceptions(AggregateNotFoundException ex) {
|
||||
final var notFoundResponseDTO = new ExceptionResponseDTO(HttpStatus.NOT_FOUND.value(), ex.getMessage(), LocalDateTime.now());
|
||||
log.error("handleAggregateNotFoundExceptions response ex:", ex);
|
||||
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(notFoundResponseDTO);
|
||||
}
|
||||
|
||||
@ResponseStatus(HttpStatus.BAD_REQUEST)
|
||||
@ExceptionHandler({InvalidAddressException.class, InvalidEmailException.class})
|
||||
public ResponseEntity<ExceptionResponseDTO> handleInvalidAggregateExceptions(AggregateNotFoundException ex) {
|
||||
final var notFoundResponseDTO = new ExceptionResponseDTO(HttpStatus.BAD_REQUEST.value(), ex.getMessage(), LocalDateTime.now());
|
||||
log.error("handleInvalidAggregateExceptions response ex:", ex);
|
||||
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(notFoundResponseDTO);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user