feature: add event store improvements

This commit is contained in:
Alexander
2022-04-18 10:18:13 +03:00
parent f29c5544ab
commit fc7bb3ea38

View File

@@ -32,6 +32,48 @@ public class EventStore implements EventStoreDB {
private final NamedParameterJdbcTemplate jdbcTemplate;
private final EventBus eventBus;
@Override
@Transactional
@NewSpan
public <T extends AggregateRoot> void save(@SpanTag("aggregate") T aggregate) {
final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());
if (aggregate.getVersion() > 1) {
this.handleConcurrency(aggregate.getId());
}
this.saveEvents(aggregate.getChanges());
if (aggregate.getVersion() % SNAPSHOT_FREQUENCY == 0) {
this.saveSnapshot(aggregate);
}
eventBus.publish(aggregateEvents);
log.info("(save) saved aggregate: {}", aggregate);
}
@Override
@Transactional(readOnly = true)
@NewSpan
public <T extends AggregateRoot> T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
final Optional<Snapshot> snapshot = this.loadSnapshot(aggregateId);
final var aggregate = this.getSnapshotFromClass(snapshot, aggregateId, aggregateType);
final List<Event> events = this.loadEvents(aggregateId, aggregate.getVersion());
events.forEach(event -> {
aggregate.raiseEvent(event);
log.info("raise event version: {}", event.getVersion());
});
if (aggregate.getVersion() == 0) throw new AggregateNotFoundException(aggregateId);
log.info("(load) loaded aggregate: {}", aggregate);
return aggregate;
}
@Override
@NewSpan
public void saveEvents(@SpanTag("events") List<Event> events) {
@@ -97,25 +139,6 @@ public class EventStore implements EventStoreDB {
log.info("(saveSnapshot) updateResult: {}", updateResult);
}
@Override
@Transactional
@NewSpan
public <T extends AggregateRoot> void save(@SpanTag("aggregate") T aggregate) {
final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());
if (aggregate.getVersion() > 1) {
this.handleConcurrency(aggregate.getId());
}
this.saveEvents(aggregate.getChanges());
if (aggregate.getVersion() % SNAPSHOT_FREQUENCY == 0) {
this.saveSnapshot(aggregate);
}
eventBus.publish(aggregateEvents);
log.info("(save) saved aggregate: {}", aggregate);
}
@NewSpan
private void handleConcurrency(@SpanTag("aggregateId") String aggregateId) {
@@ -158,26 +181,6 @@ public class EventStore implements EventStoreDB {
return EventSourcingUtils.aggregateFromSnapshot(snapshot.get(), aggregateType);
}
@Override
@Transactional(readOnly = true)
@NewSpan
public <T extends AggregateRoot> T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
final Optional<Snapshot> snapshot = this.loadSnapshot(aggregateId);
final var aggregate = this.getSnapshotFromClass(snapshot, aggregateId, aggregateType);
final List<Event> events = this.loadEvents(aggregateId, aggregate.getVersion());
events.forEach(event -> {
aggregate.raiseEvent(event);
log.info("raise event version: {}", event.getVersion());
});
if (aggregate.getVersion() == 0) throw new AggregateNotFoundException(aggregateId);
log.info("(load) loaded aggregate: {}", aggregate);
return aggregate;
}
@Override
@NewSpan
@@ -188,7 +191,7 @@ public class EventStore implements EventStoreDB {
return true;
} catch (Exception ex) {
if (!(ex instanceof EmptyResultDataAccessException)) {
throw new RuntimeException("exists", ex);
throw new RuntimeException(ex);
}
return false;
}