diff --git a/src/main/java/com/eventsourcing/es/EventStore.java b/src/main/java/com/eventsourcing/es/EventStore.java index 32f2078..b7c5054 100644 --- a/src/main/java/com/eventsourcing/es/EventStore.java +++ b/src/main/java/com/eventsourcing/es/EventStore.java @@ -17,6 +17,12 @@ import java.util.*; public class EventStore implements EventStoreDB { public static final int SNAPSHOT_FREQUENCY = 3; + private static final String SAVE_EVENTS_QUERY = "INSERT INTO events (aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp) values (:aggregate_id, :aggregate_type, :event_type, :data, :metadata, :version, now())"; + private static final String LOAD_EVENTS_QUERY = "SELECT event_id ,aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp FROM events e WHERE e.aggregate_id = :aggregate_id AND e.version > :version ORDER BY e.version ASC"; + private static final String SAVE_SNAPSHOT_QUERY = "INSERT INTO snapshots (aggregate_id, aggregate_type, data, metadata, version, timestamp) VALUES (:aggregate_id, :aggregate_type, :data, :metadata, :version, now()) ON CONFLICT (aggregate_id) DO UPDATE SET data = :data, version = :version, timestamp = now()"; + private static final String HANDLE_CONCURRENCY_QUERY = "SELECT aggregate_id FROM events e WHERE e.aggregate_id = :aggregate_id LIMIT 1 FOR UPDATE"; + private static final String LOAD_SNAPSHOT_QUERY = "SELECT aggregate_id, aggregate_type, data, metadata, version, timestamp FROM snapshots s WHERE s.aggregate_id = :aggregate_id"; + private static final String EXISTS_QUERY = "SELECT aggregate_id FROM events WHERE e e.aggregate_id = :aggregate_id"; private final NamedParameterJdbcTemplate jdbcTemplate; @@ -26,14 +32,13 @@ public class EventStore implements EventStoreDB { final List changes = new ArrayList<>(events); changes.forEach(event -> { - int result = jdbcTemplate.update("INSERT INTO events (aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp) " + - "values (:aggregate_id, :aggregate_type, :event_type, :data, :metadata, :version, now())", - Map.of("aggregate_id", event.getAggregateId(), - "aggregate_type", event.getAggregateType(), - "event_type", event.getEventType(), - "data", Objects.isNull(event.getData()) ? new byte[]{} : event.getData(), - "metadata", Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(), - "version", event.getVersion())); + int result = jdbcTemplate.update(SAVE_EVENTS_QUERY, Map.of( + "aggregate_id", event.getAggregateId(), + "aggregate_type", event.getAggregateType(), + "event_type", event.getEventType(), + "data", Objects.isNull(event.getData()) ? new byte[]{} : event.getData(), + "metadata", Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(), + "version", event.getVersion())); log.info("(saveEvents) saved result: {}, event: {}", result, event); }); @@ -41,9 +46,7 @@ public class EventStore implements EventStoreDB { @Override public List loadEvents(String aggregateId, long version) { - final List events = jdbcTemplate.query("select event_id ,aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp" + - " from events e where e.aggregate_id = :aggregate_id and e.version > :version ORDER BY e.version ASC", - Map.of("aggregate_id", aggregateId, "version", version), + final List events = jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of("aggregate_id", aggregateId, "version", version), (rs, rowNum) -> Event.builder() .aggregateId(rs.getString("aggregate_id")) .aggregateType(rs.getString("aggregate_type")) @@ -62,10 +65,7 @@ public class EventStore implements EventStoreDB { aggregate.toSnapshot(); final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate); - int update = jdbcTemplate.update("INSERT INTO snapshots (aggregate_id, aggregate_type, data, metadata, version, timestamp) " + - "VALUES (:aggregate_id, :aggregate_type, :data, :metadata, :version, now()) " + - "ON CONFLICT (aggregate_id) " + - "DO UPDATE SET data = :data, version = :version, timestamp = now()", + int update = jdbcTemplate.update(SAVE_SNAPSHOT_QUERY, Map.of("aggregate_id", snapshot.getAggregateId(), "aggregate_type", snapshot.getAggregateType(), "data", Objects.isNull(snapshot.getData()) ? new byte[]{} : snapshot.getData(), @@ -92,8 +92,7 @@ public class EventStore implements EventStoreDB { private void handleConcurrency(String aggregateId) { try { - String aggregateID = jdbcTemplate.queryForObject("SELECT aggregate_id FROM events e " + - "WHERE e.aggregate_id = :aggregate_id LIMIT 1 FOR UPDATE", Map.of("aggregate_id", aggregateId), String.class); + String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of("aggregate_id", aggregateId), String.class); log.info("(handleConcurrency) aggregateID for lock: {}", aggregateID); } catch (EmptyResultDataAccessException e) { log.info("(handleConcurrency) EmptyResultDataAccessException: {}", e.getMessage()); @@ -102,16 +101,14 @@ public class EventStore implements EventStoreDB { } private Optional loadSnapshot(String aggregateId) { - final Optional snapshot = jdbcTemplate.query("select aggregate_id, aggregate_type, data, metadata, version, timestamp from snapshots s " + - "where s.aggregate_id = :aggregate_id", - Map.of("aggregate_id", aggregateId), (rs, rowNum) -> Snapshot.builder() - .aggregateId(rs.getString("aggregate_id")) - .aggregateType(rs.getString("aggregate_type")) - .data(rs.getBytes("data")) - .metaData(rs.getBytes("metadata")) - .version(rs.getLong("version")) - .timeStamp(rs.getTimestamp("timestamp").toLocalDateTime()) - .build()).stream().findFirst(); + final Optional snapshot = jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of("aggregate_id", aggregateId), (rs, rowNum) -> Snapshot.builder() + .aggregateId(rs.getString("aggregate_id")) + .aggregateType(rs.getString("aggregate_type")) + .data(rs.getBytes("data")) + .metaData(rs.getBytes("metadata")) + .version(rs.getLong("version")) + .timeStamp(rs.getTimestamp("timestamp").toLocalDateTime()) + .build()).stream().findFirst(); snapshot.ifPresent(result -> log.info("(loadSnapshot) snapshot: {}", result)); return snapshot; @@ -156,7 +153,7 @@ public class EventStore implements EventStoreDB { @Override public Boolean exists(String aggregateId) { try { - final var id = jdbcTemplate.queryForObject("SELECT aggregate_id FROM events WHERE e e.aggregate_id = :aggregate_id", Map.of("aggregate_id", aggregateId), String.class); + final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of("aggregate_id", aggregateId), String.class); log.info("aggregate exists id: {}", id); return true; } catch (Exception ex) {