feature: add bank account aggregate

This commit is contained in:
Alexander
2022-04-13 15:56:50 +03:00
parent e8e8ded999
commit 1c7c055127
6 changed files with 82 additions and 15 deletions

View File

@@ -3,6 +3,7 @@ package com.eventsourcing.es;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
@@ -48,7 +49,7 @@ public class EventStore implements EventStoreDB {
" from events e where e.aggregate_id = :aggregate_id and e.version > :version ORDER BY e.version ASC",
Map.of("aggregate_id", aggregateId, "version", version),
(rs, rowNum) -> {
Event.builder()
Event event = Event.builder()
.aggregateId(rs.getString("aggregate_id"))
.aggregateType(rs.getString("aggregate_type"))
.eventType(rs.getString("event_type"))
@@ -57,7 +58,7 @@ public class EventStore implements EventStoreDB {
.version(rs.getLong("version"))
.timeStamp(rs.getTimestamp("timestamp").toLocalDateTime())
.build();
return null;
return event;
});
log.info("(loadEvents) events list: {}", events);
@@ -83,7 +84,10 @@ public class EventStore implements EventStoreDB {
@Override
@Transactional
public <T extends AggregateRoot> void save(T aggregate) {
this.handleConcurrency(aggregate.getId());
if (aggregate.getVersion() > 1) {
this.handleConcurrency(aggregate.getId());
}
this.saveEvents(aggregate.getChanges());
if (aggregate.getVersion() % 3 == 0) {
this.saveSnapshot(aggregate);
@@ -92,8 +96,14 @@ public class EventStore implements EventStoreDB {
}
private void handleConcurrency(String aggregateId) {
int update = jdbcTemplate.update("SELECT aggregate_id FROM events e WHERE e.aggregate_id = ? LIMIT 1 FOR UPDATE", Map.of("aggregate_id", aggregateId));
log.info("(handleConcurrency) result: {}", update);
try {
String aggregateID = jdbcTemplate.queryForObject("SELECT aggregate_id FROM events e " +
"WHERE e.aggregate_id = :aggregate_id LIMIT 1 FOR UPDATE", Map.of("aggregate_id", aggregateId), String.class);
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateID);
} catch (EmptyResultDataAccessException e) {
log.info("(handleConcurrency) EmptyResultDataAccessException: {}", e.getMessage());
}
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateId);
}
private Optional<Snapshot> loadSnapshot(String aggregateId) {
@@ -139,9 +149,11 @@ public class EventStore implements EventStoreDB {
final List<Event> events = this.loadEvents(aggregateId, aggregate.getVersion());
events.forEach(event -> {
aggregate.raiseEvent(event);
log.info("raise event: {}", event.getVersion());
log.info("raise event version: {}", event.getVersion());
});
if (aggregate.getVersion() == 0) throw new RuntimeException("aggregate not found id:" + aggregateId);
return aggregate;
}