From fc7bb3ea381cc2c4d9ef7ff7224b131ce4c9e266 Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 18 Apr 2022 10:18:13 +0300 Subject: [PATCH] feature: add event store improvements --- .../java/com/eventsourcing/es/EventStore.java | 83 ++++++++++--------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/src/main/java/com/eventsourcing/es/EventStore.java b/src/main/java/com/eventsourcing/es/EventStore.java index ccf2236..f3f1cee 100644 --- a/src/main/java/com/eventsourcing/es/EventStore.java +++ b/src/main/java/com/eventsourcing/es/EventStore.java @@ -32,6 +32,48 @@ public class EventStore implements EventStoreDB { private final NamedParameterJdbcTemplate jdbcTemplate; private final EventBus eventBus; + + @Override + @Transactional + @NewSpan + public void save(@SpanTag("aggregate") T aggregate) { + final List aggregateEvents = new ArrayList<>(aggregate.getChanges()); + + if (aggregate.getVersion() > 1) { + this.handleConcurrency(aggregate.getId()); + } + + this.saveEvents(aggregate.getChanges()); + if (aggregate.getVersion() % SNAPSHOT_FREQUENCY == 0) { + this.saveSnapshot(aggregate); + } + + eventBus.publish(aggregateEvents); + + log.info("(save) saved aggregate: {}", aggregate); + } + + @Override + @Transactional(readOnly = true) + @NewSpan + public T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class aggregateType) { + + final Optional snapshot = this.loadSnapshot(aggregateId); + + final var aggregate = this.getSnapshotFromClass(snapshot, aggregateId, aggregateType); + + final List events = this.loadEvents(aggregateId, aggregate.getVersion()); + events.forEach(event -> { + aggregate.raiseEvent(event); + log.info("raise event version: {}", event.getVersion()); + }); + + if (aggregate.getVersion() == 0) throw new AggregateNotFoundException(aggregateId); + + log.info("(load) loaded aggregate: {}", aggregate); + return aggregate; + } + @Override @NewSpan public void saveEvents(@SpanTag("events") List events) { @@ -97,25 +139,6 @@ public class EventStore implements EventStoreDB { log.info("(saveSnapshot) updateResult: {}", updateResult); } - @Override - @Transactional - @NewSpan - public void save(@SpanTag("aggregate") T aggregate) { - final List aggregateEvents = new ArrayList<>(aggregate.getChanges()); - - if (aggregate.getVersion() > 1) { - this.handleConcurrency(aggregate.getId()); - } - - this.saveEvents(aggregate.getChanges()); - if (aggregate.getVersion() % SNAPSHOT_FREQUENCY == 0) { - this.saveSnapshot(aggregate); - } - - eventBus.publish(aggregateEvents); - - log.info("(save) saved aggregate: {}", aggregate); - } @NewSpan private void handleConcurrency(@SpanTag("aggregateId") String aggregateId) { @@ -158,26 +181,6 @@ public class EventStore implements EventStoreDB { return EventSourcingUtils.aggregateFromSnapshot(snapshot.get(), aggregateType); } - @Override - @Transactional(readOnly = true) - @NewSpan - public T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class aggregateType) { - - final Optional snapshot = this.loadSnapshot(aggregateId); - - final var aggregate = this.getSnapshotFromClass(snapshot, aggregateId, aggregateType); - - final List events = this.loadEvents(aggregateId, aggregate.getVersion()); - events.forEach(event -> { - aggregate.raiseEvent(event); - log.info("raise event version: {}", event.getVersion()); - }); - - if (aggregate.getVersion() == 0) throw new AggregateNotFoundException(aggregateId); - - log.info("(load) loaded aggregate: {}", aggregate); - return aggregate; - } @Override @NewSpan @@ -188,7 +191,7 @@ public class EventStore implements EventStoreDB { return true; } catch (Exception ex) { if (!(ex instanceof EmptyResultDataAccessException)) { - throw new RuntimeException("exists", ex); + throw new RuntimeException(ex); } return false; }