feature: add updates and improvements

This commit is contained in:
Alexander
2022-04-16 14:14:20 +03:00
parent 92974d37b6
commit c8105a8e91
5 changed files with 42 additions and 56 deletions

View File

@@ -6,7 +6,6 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.util.UUID;
@Data
@@ -14,6 +13,15 @@ import java.util.UUID;
@AllArgsConstructor
@Builder
public class Event {
private UUID id;
private String aggregateId;
private String eventType;
private String aggregateType;
private long version;
private byte[] data;
private byte[] metaData;
private LocalDateTime timeStamp;
public Event(String eventType, String aggregateType) {
this.id = UUID.randomUUID();
@@ -22,23 +30,6 @@ public class Event {
this.timeStamp = LocalDateTime.now();
}
private UUID id;
private String aggregateId;
private String eventType;
private String aggregateType;
private long version;
private byte[] data;
private byte[] metaData;
// @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm")
private LocalDateTime timeStamp;
@Override
public String toString() {

View File

@@ -13,6 +13,8 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import static com.eventsourcing.es.Constants.*;
@Repository
@RequiredArgsConstructor
@Slf4j
@@ -37,12 +39,12 @@ public class EventStore implements EventStoreDB {
final List<Event> changes = new ArrayList<>(events);
changes.forEach(event -> {
int result = jdbcTemplate.update(SAVE_EVENTS_QUERY, Map.of(
"aggregate_id", event.getAggregateId(),
"aggregate_type", event.getAggregateType(),
"event_type", event.getEventType(),
"data", Objects.isNull(event.getData()) ? new byte[]{} : event.getData(),
"metadata", Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(),
"version", event.getVersion()));
AGGREGATE_ID, event.getAggregateId(),
AGGREGATE_TYPE, event.getAggregateType(),
EVENT_TYPE, event.getEventType(),
DATA, Objects.isNull(event.getData()) ? new byte[]{} : event.getData(),
METADATA, Objects.isNull(event.getMetaData()) ? new byte[]{} : event.getMetaData(),
VERSION, event.getVersion()));
log.info("(saveEvents) saved result: {}, event: {}", result, event);
});
@@ -51,15 +53,15 @@ public class EventStore implements EventStoreDB {
@Override
@NewSpan
public List<Event> loadEvents(@SpanTag("aggregateId") String aggregateId, @SpanTag("version") long version) {
final List<Event> events = jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of("aggregate_id", aggregateId, "version", version),
final List<Event> events = jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of(AGGREGATE_ID, aggregateId, VERSION, version),
(rs, rowNum) -> Event.builder()
.aggregateId(rs.getString("aggregate_id"))
.aggregateType(rs.getString("aggregate_type"))
.eventType(rs.getString("event_type"))
.data(rs.getBytes("data"))
.metaData(rs.getBytes("metadata"))
.version(rs.getLong("version"))
.timeStamp(rs.getTimestamp("timestamp").toLocalDateTime())
.aggregateId(rs.getString(AGGREGATE_ID))
.aggregateType(rs.getString(AGGREGATE_TYPE))
.eventType(rs.getString(EVENT_TYPE))
.data(rs.getBytes(DATA))
.metaData(rs.getBytes(METADATA))
.version(rs.getLong(VERSION))
.timeStamp(rs.getTimestamp(TIMESTAMP).toLocalDateTime())
.build());
log.info("(loadEvents) events list: {}", events);
@@ -72,11 +74,11 @@ public class EventStore implements EventStoreDB {
final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate);
int update = jdbcTemplate.update(SAVE_SNAPSHOT_QUERY,
Map.of("aggregate_id", snapshot.getAggregateId(),
"aggregate_type", snapshot.getAggregateType(),
"data", Objects.isNull(snapshot.getData()) ? new byte[]{} : snapshot.getData(),
"metadata", Objects.isNull(snapshot.getMetaData()) ? new byte[]{} : snapshot.getMetaData(),
"version", snapshot.getVersion()));
Map.of(AGGREGATE_ID, snapshot.getAggregateId(),
AGGREGATE_TYPE, snapshot.getAggregateType(),
DATA, Objects.isNull(snapshot.getData()) ? new byte[]{} : snapshot.getData(),
METADATA, Objects.isNull(snapshot.getMetaData()) ? new byte[]{} : snapshot.getMetaData(),
VERSION, snapshot.getVersion()));
log.info("(saveSnapshot) result: {}", update);
}
@@ -104,7 +106,7 @@ public class EventStore implements EventStoreDB {
@NewSpan
private void handleConcurrency(@SpanTag("aggregateId") String aggregateId) {
try {
String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of("aggregate_id", aggregateId), String.class);
String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of(AGGREGATE_ID, aggregateId), String.class);
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateID);
} catch (EmptyResultDataAccessException e) {
log.info("(handleConcurrency) EmptyResultDataAccessException: {}", e.getMessage());
@@ -114,13 +116,13 @@ public class EventStore implements EventStoreDB {
@NewSpan
private Optional<Snapshot> loadSnapshot(@SpanTag("aggregateId") String aggregateId) {
final Optional<Snapshot> snapshot = jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of("aggregate_id", aggregateId), (rs, rowNum) -> Snapshot.builder()
.aggregateId(rs.getString("aggregate_id"))
.aggregateType(rs.getString("aggregate_type"))
.data(rs.getBytes("data"))
.metaData(rs.getBytes("metadata"))
.version(rs.getLong("version"))
.timeStamp(rs.getTimestamp("timestamp").toLocalDateTime())
final Optional<Snapshot> snapshot = jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of(AGGREGATE_ID, aggregateId), (rs, rowNum) -> Snapshot.builder()
.aggregateId(rs.getString(AGGREGATE_ID))
.aggregateType(rs.getString(AGGREGATE_TYPE))
.data(rs.getBytes(DATA))
.metaData(rs.getBytes(METADATA))
.version(rs.getLong(VERSION))
.timeStamp(rs.getTimestamp(TIMESTAMP).toLocalDateTime())
.build()).stream().findFirst();
snapshot.ifPresent(result -> log.info("(loadSnapshot) snapshot: {}", result));
@@ -170,7 +172,7 @@ public class EventStore implements EventStoreDB {
@NewSpan
public Boolean exists(@SpanTag("aggregateId") String aggregateId) {
try {
final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of("aggregate_id", aggregateId), String.class);
final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of(AGGREGATE_ID, aggregateId), String.class);
log.info("aggregate exists id: {}", id);
return true;
} catch (Exception ex) {

View File

@@ -17,7 +17,9 @@ import java.util.concurrent.TimeUnit;
@Slf4j
@RequiredArgsConstructor
public class KafkaEventBus implements EventBus {
private final KafkaTemplate<String, byte[]> kafkaTemplate;
private final static long sendTimeout = 3000;
@Value(value = "${order.kafka.topics.bank-account-event-store:bank-account-event-store}")
private String bankAccountTopicName;
@@ -29,7 +31,7 @@ public class KafkaEventBus implements EventBus {
final ProducerRecord<String, byte[]> record = new ProducerRecord<>(bankAccountTopicName, eventsBytes);
try {
kafkaTemplate.send(record).get(3000, TimeUnit.MILLISECONDS);
kafkaTemplate.send(record).get(sendTimeout, TimeUnit.MILLISECONDS);
log.info("publishing kafka record value >>>>> {}", new String(record.value()));
} catch (Exception ex) {

View File

@@ -15,21 +15,13 @@ import java.util.UUID;
public class Snapshot {
private UUID id;
private String aggregateId;
private String aggregateType;
private byte[] data;
private byte[] metaData;
private long version;
private LocalDateTime timeStamp;
@Override
public String toString() {
return "Snapshot{" +

View File

@@ -25,7 +25,6 @@ import java.util.Map;
@Order(2)
public class GlobalControllerAdvice {
@ExceptionHandler(RuntimeException.class)
public ResponseEntity<InternalServerErrorResponse> handleRuntimeException(RuntimeException ex, WebRequest request) {
final var response = new InternalServerErrorResponse(HttpStatus.INTERNAL_SERVER_ERROR.value(), ex.getMessage(), LocalDateTime.now().toString());