package com.eventsourcing.es; import com.eventsourcing.es.exceptions.AggregateNotFoundException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; import java.util.*; @Repository @RequiredArgsConstructor @Slf4j public class EventStore implements EventStoreDB { public static final int SNAPSHOT_FREQUENCY = 3; private final NamedParameterJdbcTemplate jdbcTemplate; @Override public void saveEvents(List events) { if (events.isEmpty()) return; final List changes = new ArrayList<>(events); changes.forEach(event -> { int result = jdbcTemplate.update("INSERT INTO events (aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp) " + "values (:aggregate_id, :aggregate_type, :event_type, :data, :metadata, :version, now())", Map.of("aggregate_id", event.getAggregateId(), "aggregate_type", event.getAggregateType(), "event_type", event.getEventType(), "data", Objects.isNull(event.getData()) ? new byte[]{} : event.getData(), "metadata", Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(), "version", event.getVersion())); log.info("(saveEvents) saved result: {}, event: {}", result, event); }); } @Override public List loadEvents(String aggregateId, long version) { final List events = jdbcTemplate.query("select event_id ,aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp" + " from events e where e.aggregate_id = :aggregate_id and e.version > :version ORDER BY e.version ASC", Map.of("aggregate_id", aggregateId, "version", version), (rs, rowNum) -> Event.builder() .aggregateId(rs.getString("aggregate_id")) .aggregateType(rs.getString("aggregate_type")) .eventType(rs.getString("event_type")) .data(rs.getBytes("data")) .metaData(rs.getBytes("metadata")) .version(rs.getLong("version")) .timeStamp(rs.getTimestamp("timestamp").toLocalDateTime()) .build()); log.info("(loadEvents) events list: {}", events); return events; } private void saveSnapshot(T aggregate) { aggregate.toSnapshot(); final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate); int update = jdbcTemplate.update("INSERT INTO snapshots (aggregate_id, aggregate_type, data, metadata, version, timestamp) " + "VALUES (:aggregate_id, :aggregate_type, :data, :metadata, :version, now()) " + "ON CONFLICT (aggregate_id) " + "DO UPDATE SET data = :data, version = :version, timestamp = now()", Map.of("aggregate_id", snapshot.getAggregateId(), "aggregate_type", snapshot.getAggregateType(), "data", Objects.isNull(snapshot.getData()) ? new byte[]{} : snapshot.getData(), "metadata", Objects.isNull(snapshot.getMetaData()) ? new byte[]{} : snapshot.getMetaData(), "version", snapshot.getVersion())); log.info("(saveSnapshot) result: {}", update); } @Override @Transactional public void save(T aggregate) { if (aggregate.getVersion() > 1) { this.handleConcurrency(aggregate.getId()); } this.saveEvents(aggregate.getChanges()); if (aggregate.getVersion() % SNAPSHOT_FREQUENCY == 0) { this.saveSnapshot(aggregate); } log.info("(save) saved aggregate: {}", aggregate); } private void handleConcurrency(String aggregateId) { try { String aggregateID = jdbcTemplate.queryForObject("SELECT aggregate_id FROM events e " + "WHERE e.aggregate_id = :aggregate_id LIMIT 1 FOR UPDATE", Map.of("aggregate_id", aggregateId), String.class); log.info("(handleConcurrency) aggregateID for lock: {}", aggregateID); } catch (EmptyResultDataAccessException e) { log.info("(handleConcurrency) EmptyResultDataAccessException: {}", e.getMessage()); } log.info("(handleConcurrency) aggregateID for lock: {}", aggregateId); } private Optional loadSnapshot(String aggregateId) { final Optional snapshot = jdbcTemplate.query("select aggregate_id, aggregate_type, data, metadata, version, timestamp from snapshots s " + "where s.aggregate_id = :aggregate_id", Map.of("aggregate_id", aggregateId), (rs, rowNum) -> Snapshot.builder() .aggregateId(rs.getString("aggregate_id")) .aggregateType(rs.getString("aggregate_type")) .data(rs.getBytes("data")) .metaData(rs.getBytes("metadata")) .version(rs.getLong("version")) .timeStamp(rs.getTimestamp("timestamp").toLocalDateTime()) .build()).stream().findFirst(); snapshot.ifPresent(result -> log.info("(loadSnapshot) snapshot: {}", result)); return snapshot; } private T getAggregate(final String aggregateId, final Class aggregateType) { try { return aggregateType.getConstructor(String.class).newInstance(aggregateId); } catch (Exception ex) { throw new RuntimeException(ex); } } private T getSnapshotFromClass(Optional snapshot, String aggregateId, Class aggregateType) { if (snapshot.isEmpty()) { final var defaultSnapshot = EventSourcingUtils.snapshotFromAggregate(getAggregate(aggregateId, aggregateType)); return EventSourcingUtils.aggregateFromSnapshot(defaultSnapshot, aggregateType); } return EventSourcingUtils.aggregateFromSnapshot(snapshot.get(), aggregateType); } @Override @Transactional(readOnly = true) public T load(String aggregateId, Class aggregateType) { final Optional snapshot = this.loadSnapshot(aggregateId); final var aggregate = this.getSnapshotFromClass(snapshot, aggregateId, aggregateType); final List events = this.loadEvents(aggregateId, aggregate.getVersion()); events.forEach(event -> { aggregate.raiseEvent(event); log.info("raise event version: {}", event.getVersion()); }); if (aggregate.getVersion() == 0) throw new AggregateNotFoundException(aggregateId); log.info("(load) loaded aggregate: {}", aggregate); return aggregate; } @Override public Boolean exists(String aggregateId) { try { final var id = jdbcTemplate.queryForObject("SELECT aggregate_id FROM events WHERE e e.aggregate_id = :aggregate_id", Map.of("aggregate_id", aggregateId), String.class); log.info("aggregate exists id: {}", id); return true; } catch (Exception ex) { if (!(ex instanceof EmptyResultDataAccessException)) { throw new RuntimeException("exists", ex); } return false; } } }