diff --git a/src/main/java/com/eventsourcing/es/Event.java b/src/main/java/com/eventsourcing/es/Event.java index 68d7ead..00105aa 100644 --- a/src/main/java/com/eventsourcing/es/Event.java +++ b/src/main/java/com/eventsourcing/es/Event.java @@ -6,7 +6,6 @@ import lombok.Data; import lombok.NoArgsConstructor; import java.time.LocalDateTime; -import java.time.ZonedDateTime; import java.util.UUID; @Data @@ -14,6 +13,15 @@ import java.util.UUID; @AllArgsConstructor @Builder public class Event { + private UUID id; + private String aggregateId; + private String eventType; + private String aggregateType; + private long version; + private byte[] data; + private byte[] metaData; + private LocalDateTime timeStamp; + public Event(String eventType, String aggregateType) { this.id = UUID.randomUUID(); @@ -22,23 +30,6 @@ public class Event { this.timeStamp = LocalDateTime.now(); } - private UUID id; - - private String aggregateId; - - private String eventType; - - private String aggregateType; - - private long version; - - private byte[] data; - - private byte[] metaData; - - // @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm") - private LocalDateTime timeStamp; - @Override public String toString() { diff --git a/src/main/java/com/eventsourcing/es/EventStore.java b/src/main/java/com/eventsourcing/es/EventStore.java index bbbca34..db5d346 100644 --- a/src/main/java/com/eventsourcing/es/EventStore.java +++ b/src/main/java/com/eventsourcing/es/EventStore.java @@ -13,6 +13,8 @@ import org.springframework.transaction.annotation.Transactional; import java.util.*; +import static com.eventsourcing.es.Constants.*; + @Repository @RequiredArgsConstructor @Slf4j @@ -37,12 +39,12 @@ public class EventStore implements EventStoreDB { final List changes = new ArrayList<>(events); changes.forEach(event -> { int result = jdbcTemplate.update(SAVE_EVENTS_QUERY, Map.of( - "aggregate_id", event.getAggregateId(), - "aggregate_type", event.getAggregateType(), - "event_type", event.getEventType(), - "data", Objects.isNull(event.getData()) ? new byte[]{} : event.getData(), - "metadata", Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(), - "version", event.getVersion())); + AGGREGATE_ID, event.getAggregateId(), + AGGREGATE_TYPE, event.getAggregateType(), + EVENT_TYPE, event.getEventType(), + DATA, Objects.isNull(event.getData()) ? new byte[]{} : event.getData(), + METADATA, Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(), + VERSION, event.getVersion())); log.info("(saveEvents) saved result: {}, event: {}", result, event); }); @@ -51,15 +53,15 @@ public class EventStore implements EventStoreDB { @Override @NewSpan public List loadEvents(@SpanTag("aggregateId") String aggregateId, @SpanTag("version") long version) { - final List events = jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of("aggregate_id", aggregateId, "version", version), + final List events = jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of(AGGREGATE_ID, aggregateId, VERSION, version), (rs, rowNum) -> Event.builder() - .aggregateId(rs.getString("aggregate_id")) - .aggregateType(rs.getString("aggregate_type")) - .eventType(rs.getString("event_type")) - .data(rs.getBytes("data")) - .metaData(rs.getBytes("metadata")) - .version(rs.getLong("version")) - .timeStamp(rs.getTimestamp("timestamp").toLocalDateTime()) + .aggregateId(rs.getString(AGGREGATE_ID)) + .aggregateType(rs.getString(AGGREGATE_TYPE)) + .eventType(rs.getString(EVENT_TYPE)) + .data(rs.getBytes(DATA)) + .metaData(rs.getBytes(METADATA)) + .version(rs.getLong(VERSION)) + .timeStamp(rs.getTimestamp(TIMESTAMP).toLocalDateTime()) .build()); log.info("(loadEvents) events list: {}", events); @@ -72,11 +74,11 @@ public class EventStore implements EventStoreDB { final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate); int update = jdbcTemplate.update(SAVE_SNAPSHOT_QUERY, - Map.of("aggregate_id", snapshot.getAggregateId(), - "aggregate_type", snapshot.getAggregateType(), - "data", Objects.isNull(snapshot.getData()) ? new byte[]{} : snapshot.getData(), - "metadata", Objects.isNull(snapshot.getMetaData()) ? new byte[]{} : snapshot.getMetaData(), - "version", snapshot.getVersion())); + Map.of(AGGREGATE_ID, snapshot.getAggregateId(), + AGGREGATE_TYPE, snapshot.getAggregateType(), + DATA, Objects.isNull(snapshot.getData()) ? new byte[]{} : snapshot.getData(), + METADATA, Objects.isNull(snapshot.getMetaData()) ? new byte[]{} : snapshot.getMetaData(), + VERSION, snapshot.getVersion())); log.info("(saveSnapshot) result: {}", update); } @@ -104,7 +106,7 @@ public class EventStore implements EventStoreDB { @NewSpan private void handleConcurrency(@SpanTag("aggregateId") String aggregateId) { try { - String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of("aggregate_id", aggregateId), String.class); + String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of(AGGREGATE_ID, aggregateId), String.class); log.info("(handleConcurrency) aggregateID for lock: {}", aggregateID); } catch (EmptyResultDataAccessException e) { log.info("(handleConcurrency) EmptyResultDataAccessException: {}", e.getMessage()); @@ -114,13 +116,13 @@ public class EventStore implements EventStoreDB { @NewSpan private Optional loadSnapshot(@SpanTag("aggregateId") String aggregateId) { - final Optional snapshot = jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of("aggregate_id", aggregateId), (rs, rowNum) -> Snapshot.builder() - .aggregateId(rs.getString("aggregate_id")) - .aggregateType(rs.getString("aggregate_type")) - .data(rs.getBytes("data")) - .metaData(rs.getBytes("metadata")) - .version(rs.getLong("version")) - .timeStamp(rs.getTimestamp("timestamp").toLocalDateTime()) + final Optional snapshot = jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of(AGGREGATE_ID, aggregateId), (rs, rowNum) -> Snapshot.builder() + .aggregateId(rs.getString(AGGREGATE_ID)) + .aggregateType(rs.getString(AGGREGATE_TYPE)) + .data(rs.getBytes(DATA)) + .metaData(rs.getBytes(METADATA)) + .version(rs.getLong(VERSION)) + .timeStamp(rs.getTimestamp(TIMESTAMP).toLocalDateTime()) .build()).stream().findFirst(); snapshot.ifPresent(result -> log.info("(loadSnapshot) snapshot: {}", result)); @@ -170,7 +172,7 @@ public class EventStore implements EventStoreDB { @NewSpan public Boolean exists(@SpanTag("aggregateId") String aggregateId) { try { - final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of("aggregate_id", aggregateId), String.class); + final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of(AGGREGATE_ID, aggregateId), String.class); log.info("aggregate exists id: {}", id); return true; } catch (Exception ex) { diff --git a/src/main/java/com/eventsourcing/es/KafkaEventBus.java b/src/main/java/com/eventsourcing/es/KafkaEventBus.java index 9a7373d..7152ed0 100644 --- a/src/main/java/com/eventsourcing/es/KafkaEventBus.java +++ b/src/main/java/com/eventsourcing/es/KafkaEventBus.java @@ -17,7 +17,9 @@ import java.util.concurrent.TimeUnit; @Slf4j @RequiredArgsConstructor public class KafkaEventBus implements EventBus { + private final KafkaTemplate kafkaTemplate; + private final static long sendTimeout = 3000; @Value(value = "${order.kafka.topics.bank-account-event-store:bank-account-event-store}") private String bankAccountTopicName; @@ -29,7 +31,7 @@ public class KafkaEventBus implements EventBus { final ProducerRecord record = new ProducerRecord<>(bankAccountTopicName, eventsBytes); try { - kafkaTemplate.send(record).get(3000, TimeUnit.MILLISECONDS); + kafkaTemplate.send(record).get(sendTimeout, TimeUnit.MILLISECONDS); log.info("publishing kafka record value >>>>> {}", new String(record.value())); } catch (Exception ex) { diff --git a/src/main/java/com/eventsourcing/es/Snapshot.java b/src/main/java/com/eventsourcing/es/Snapshot.java index ea8d43d..506070a 100644 --- a/src/main/java/com/eventsourcing/es/Snapshot.java +++ b/src/main/java/com/eventsourcing/es/Snapshot.java @@ -15,21 +15,13 @@ import java.util.UUID; public class Snapshot { private UUID id; - private String aggregateId; - - private String aggregateType; - private byte[] data; - private byte[] metaData; - - private long version; private LocalDateTime timeStamp; - @Override public String toString() { return "Snapshot{" + diff --git a/src/main/java/com/eventsourcing/filters/GlobalControllerAdvice.java b/src/main/java/com/eventsourcing/filters/GlobalControllerAdvice.java index 32674fd..2ae8dea 100644 --- a/src/main/java/com/eventsourcing/filters/GlobalControllerAdvice.java +++ b/src/main/java/com/eventsourcing/filters/GlobalControllerAdvice.java @@ -25,7 +25,6 @@ import java.util.Map; @Order(2) public class GlobalControllerAdvice { - @ExceptionHandler(RuntimeException.class) public ResponseEntity handleRuntimeException(RuntimeException ex, WebRequest request) { final var response = new InternalServerErrorResponse(HttpStatus.INTERNAL_SERVER_ERROR.value(), ex.getMessage(), LocalDateTime.now().toString());