feature: add event store improvements update
This commit is contained in:
@@ -17,6 +17,12 @@ import java.util.*;
|
||||
public class EventStore implements EventStoreDB {
|
||||
|
||||
public static final int SNAPSHOT_FREQUENCY = 3;
|
||||
private static final String SAVE_EVENTS_QUERY = "INSERT INTO events (aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp) values (:aggregate_id, :aggregate_type, :event_type, :data, :metadata, :version, now())";
|
||||
private static final String LOAD_EVENTS_QUERY = "SELECT event_id ,aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp FROM events e WHERE e.aggregate_id = :aggregate_id AND e.version > :version ORDER BY e.version ASC";
|
||||
private static final String SAVE_SNAPSHOT_QUERY = "INSERT INTO snapshots (aggregate_id, aggregate_type, data, metadata, version, timestamp) VALUES (:aggregate_id, :aggregate_type, :data, :metadata, :version, now()) ON CONFLICT (aggregate_id) DO UPDATE SET data = :data, version = :version, timestamp = now()";
|
||||
private static final String HANDLE_CONCURRENCY_QUERY = "SELECT aggregate_id FROM events e WHERE e.aggregate_id = :aggregate_id LIMIT 1 FOR UPDATE";
|
||||
private static final String LOAD_SNAPSHOT_QUERY = "SELECT aggregate_id, aggregate_type, data, metadata, version, timestamp FROM snapshots s WHERE s.aggregate_id = :aggregate_id";
|
||||
private static final String EXISTS_QUERY = "SELECT aggregate_id FROM events WHERE e e.aggregate_id = :aggregate_id";
|
||||
|
||||
private final NamedParameterJdbcTemplate jdbcTemplate;
|
||||
|
||||
@@ -26,14 +32,13 @@ public class EventStore implements EventStoreDB {
|
||||
|
||||
final List<Event> changes = new ArrayList<>(events);
|
||||
changes.forEach(event -> {
|
||||
int result = jdbcTemplate.update("INSERT INTO events (aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp) " +
|
||||
"values (:aggregate_id, :aggregate_type, :event_type, :data, :metadata, :version, now())",
|
||||
Map.of("aggregate_id", event.getAggregateId(),
|
||||
"aggregate_type", event.getAggregateType(),
|
||||
"event_type", event.getEventType(),
|
||||
"data", Objects.isNull(event.getData()) ? new byte[]{} : event.getData(),
|
||||
"metadata", Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(),
|
||||
"version", event.getVersion()));
|
||||
int result = jdbcTemplate.update(SAVE_EVENTS_QUERY, Map.of(
|
||||
"aggregate_id", event.getAggregateId(),
|
||||
"aggregate_type", event.getAggregateType(),
|
||||
"event_type", event.getEventType(),
|
||||
"data", Objects.isNull(event.getData()) ? new byte[]{} : event.getData(),
|
||||
"metadata", Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(),
|
||||
"version", event.getVersion()));
|
||||
|
||||
log.info("(saveEvents) saved result: {}, event: {}", result, event);
|
||||
});
|
||||
@@ -41,9 +46,7 @@ public class EventStore implements EventStoreDB {
|
||||
|
||||
@Override
|
||||
public List<Event> loadEvents(String aggregateId, long version) {
|
||||
final List<Event> events = jdbcTemplate.query("select event_id ,aggregate_id, aggregate_type, event_type, data, metadata, version, timestamp" +
|
||||
" from events e where e.aggregate_id = :aggregate_id and e.version > :version ORDER BY e.version ASC",
|
||||
Map.of("aggregate_id", aggregateId, "version", version),
|
||||
final List<Event> events = jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of("aggregate_id", aggregateId, "version", version),
|
||||
(rs, rowNum) -> Event.builder()
|
||||
.aggregateId(rs.getString("aggregate_id"))
|
||||
.aggregateType(rs.getString("aggregate_type"))
|
||||
@@ -62,10 +65,7 @@ public class EventStore implements EventStoreDB {
|
||||
aggregate.toSnapshot();
|
||||
final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate);
|
||||
|
||||
int update = jdbcTemplate.update("INSERT INTO snapshots (aggregate_id, aggregate_type, data, metadata, version, timestamp) " +
|
||||
"VALUES (:aggregate_id, :aggregate_type, :data, :metadata, :version, now()) " +
|
||||
"ON CONFLICT (aggregate_id) " +
|
||||
"DO UPDATE SET data = :data, version = :version, timestamp = now()",
|
||||
int update = jdbcTemplate.update(SAVE_SNAPSHOT_QUERY,
|
||||
Map.of("aggregate_id", snapshot.getAggregateId(),
|
||||
"aggregate_type", snapshot.getAggregateType(),
|
||||
"data", Objects.isNull(snapshot.getData()) ? new byte[]{} : snapshot.getData(),
|
||||
@@ -92,8 +92,7 @@ public class EventStore implements EventStoreDB {
|
||||
|
||||
private void handleConcurrency(String aggregateId) {
|
||||
try {
|
||||
String aggregateID = jdbcTemplate.queryForObject("SELECT aggregate_id FROM events e " +
|
||||
"WHERE e.aggregate_id = :aggregate_id LIMIT 1 FOR UPDATE", Map.of("aggregate_id", aggregateId), String.class);
|
||||
String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of("aggregate_id", aggregateId), String.class);
|
||||
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateID);
|
||||
} catch (EmptyResultDataAccessException e) {
|
||||
log.info("(handleConcurrency) EmptyResultDataAccessException: {}", e.getMessage());
|
||||
@@ -102,16 +101,14 @@ public class EventStore implements EventStoreDB {
|
||||
}
|
||||
|
||||
private Optional<Snapshot> loadSnapshot(String aggregateId) {
|
||||
final Optional<Snapshot> snapshot = jdbcTemplate.query("select aggregate_id, aggregate_type, data, metadata, version, timestamp from snapshots s " +
|
||||
"where s.aggregate_id = :aggregate_id",
|
||||
Map.of("aggregate_id", aggregateId), (rs, rowNum) -> Snapshot.builder()
|
||||
.aggregateId(rs.getString("aggregate_id"))
|
||||
.aggregateType(rs.getString("aggregate_type"))
|
||||
.data(rs.getBytes("data"))
|
||||
.metaData(rs.getBytes("metadata"))
|
||||
.version(rs.getLong("version"))
|
||||
.timeStamp(rs.getTimestamp("timestamp").toLocalDateTime())
|
||||
.build()).stream().findFirst();
|
||||
final Optional<Snapshot> snapshot = jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of("aggregate_id", aggregateId), (rs, rowNum) -> Snapshot.builder()
|
||||
.aggregateId(rs.getString("aggregate_id"))
|
||||
.aggregateType(rs.getString("aggregate_type"))
|
||||
.data(rs.getBytes("data"))
|
||||
.metaData(rs.getBytes("metadata"))
|
||||
.version(rs.getLong("version"))
|
||||
.timeStamp(rs.getTimestamp("timestamp").toLocalDateTime())
|
||||
.build()).stream().findFirst();
|
||||
|
||||
snapshot.ifPresent(result -> log.info("(loadSnapshot) snapshot: {}", result));
|
||||
return snapshot;
|
||||
@@ -156,7 +153,7 @@ public class EventStore implements EventStoreDB {
|
||||
@Override
|
||||
public Boolean exists(String aggregateId) {
|
||||
try {
|
||||
final var id = jdbcTemplate.queryForObject("SELECT aggregate_id FROM events WHERE e e.aggregate_id = :aggregate_id", Map.of("aggregate_id", aggregateId), String.class);
|
||||
final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of("aggregate_id", aggregateId), String.class);
|
||||
log.info("aggregate exists id: {}", id);
|
||||
return true;
|
||||
} catch (Exception ex) {
|
||||
|
||||
Reference in New Issue
Block a user