diff --git a/pom.xml b/pom.xml index a4e6750..860799b 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,16 @@ 17 + + org.springframework.cloud + spring-cloud-sleuth-zipkin + 3.1.1 + + + org.springframework.cloud + spring-cloud-starter-sleuth + 3.1.1 + io.micrometer micrometer-registry-prometheus diff --git a/src/main/java/com/eventsourcing/bankAccount/commands/BankAccountCommandHandler.java b/src/main/java/com/eventsourcing/bankAccount/commands/BankAccountCommandHandler.java index 6fac090..76464a9 100644 --- a/src/main/java/com/eventsourcing/bankAccount/commands/BankAccountCommandHandler.java +++ b/src/main/java/com/eventsourcing/bankAccount/commands/BankAccountCommandHandler.java @@ -5,6 +5,7 @@ import com.eventsourcing.bankAccount.domain.BankAccountAggregate; import com.eventsourcing.es.EventStoreDB; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.cloud.sleuth.annotation.NewSpan; import org.springframework.stereotype.Service; @RequiredArgsConstructor @@ -15,6 +16,7 @@ public class BankAccountCommandHandler implements BankAccountCommandService{ private final EventStoreDB eventStoreDB; @Override + @NewSpan public String handle(CreateBankAccountCommand command) { final var aggregate = new BankAccountAggregate(command.aggregateID()); aggregate.createBankAccount(command.email(), command.address(), command.userName()); @@ -25,6 +27,7 @@ public class BankAccountCommandHandler implements BankAccountCommandService{ } @Override + @NewSpan public void handle(ChangeEmailCommand command) { final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class); aggregate.changeEmail(command.newEmail()); @@ -33,6 +36,7 @@ public class BankAccountCommandHandler implements BankAccountCommandService{ } @Override + @NewSpan public void handle(ChangeAddressCommand command) { final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class); aggregate.changeAddress(command.newAddress()); @@ -41,6 +45,7 @@ public class BankAccountCommandHandler implements BankAccountCommandService{ } @Override + @NewSpan public void handle(DepositAmountCommand command) { final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class); aggregate.depositBalance(command.amount()); diff --git a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java index 6ed2f34..6eb4c1f 100644 --- a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java +++ b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java @@ -15,6 +15,7 @@ import com.eventsourcing.es.SerializerUtils; import com.eventsourcing.mappers.BankAccountMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.cloud.sleuth.annotation.NewSpan; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata; import org.springframework.kafka.support.Acknowledgment; @@ -54,6 +55,7 @@ public class BankAccountMongoProjection implements Projection { } } + @NewSpan private void processEvents(List events) { if (events.isEmpty()) return; @@ -69,6 +71,7 @@ public class BankAccountMongoProjection implements Projection { } @Override + @NewSpan public void when(Event event) { final var aggregateId = event.getAggregateId(); log.info("(when) >>>>> aggregateId: {}", aggregateId); @@ -87,6 +90,7 @@ public class BankAccountMongoProjection implements Projection { } + @NewSpan private void handle(BankAccountCreatedEvent event) { log.info("(when) BankAccountCreatedEvent: {}, aggregateID: {}", event, event.getAggregateId()); @@ -102,6 +106,7 @@ public class BankAccountMongoProjection implements Projection { log.info("(BankAccountCreatedEvent) insert: {}", insert); } + @NewSpan private void handle(EmailChangedEvent event) { log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId()); Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); @@ -113,6 +118,7 @@ public class BankAccountMongoProjection implements Projection { mongoRepository.save(document); } + @NewSpan private void handle(AddressUpdatedEvent event) { log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId()); Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); @@ -124,6 +130,7 @@ public class BankAccountMongoProjection implements Projection { mongoRepository.save(document); } + @NewSpan private void handle(BalanceDepositedEvent event) { log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId()); Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); diff --git a/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java b/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java index 13ffbcc..8fd1328 100644 --- a/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java +++ b/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java @@ -8,6 +8,7 @@ import com.eventsourcing.es.EventStoreDB; import com.eventsourcing.mappers.BankAccountMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.cloud.sleuth.annotation.NewSpan; import org.springframework.stereotype.Service; import java.util.Optional; @@ -22,6 +23,7 @@ public class BankAccountQueryHandler implements BankAccountQueryService { private final BankAccountMongoRepository mongoRepository; @Override + @NewSpan public BankAccountResponseDTO handle(GetBankAccountByIDQuery query) { Optional optionalDocument = mongoRepository.findByAggregateId(query.aggregateID()); if (optionalDocument.isPresent()) { diff --git a/src/main/java/com/eventsourcing/es/EventStore.java b/src/main/java/com/eventsourcing/es/EventStore.java index a1f3f2c..9d9afab 100644 --- a/src/main/java/com/eventsourcing/es/EventStore.java +++ b/src/main/java/com/eventsourcing/es/EventStore.java @@ -4,6 +4,7 @@ package com.eventsourcing.es; import com.eventsourcing.es.exceptions.AggregateNotFoundException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.cloud.sleuth.annotation.NewSpan; import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.stereotype.Repository; @@ -28,6 +29,7 @@ public class EventStore implements EventStoreDB { private final EventBus eventBus; @Override + @NewSpan public void saveEvents(List events) { if (events.isEmpty()) return; @@ -46,6 +48,7 @@ public class EventStore implements EventStoreDB { } @Override + @NewSpan public List loadEvents(String aggregateId, long version) { final List events = jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of("aggregate_id", aggregateId, "version", version), (rs, rowNum) -> Event.builder() @@ -78,6 +81,7 @@ public class EventStore implements EventStoreDB { @Override @Transactional + @NewSpan public void save(T aggregate) { final List aggregateEvents = new ArrayList<>(aggregate.getChanges()); @@ -95,6 +99,7 @@ public class EventStore implements EventStoreDB { log.info("(save) saved aggregate: {}", aggregate); } + @NewSpan private void handleConcurrency(String aggregateId) { try { String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of("aggregate_id", aggregateId), String.class); @@ -105,6 +110,7 @@ public class EventStore implements EventStoreDB { log.info("(handleConcurrency) aggregateID for lock: {}", aggregateId); } + @NewSpan private Optional loadSnapshot(String aggregateId) { final Optional snapshot = jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of("aggregate_id", aggregateId), (rs, rowNum) -> Snapshot.builder() .aggregateId(rs.getString("aggregate_id")) @@ -119,6 +125,7 @@ public class EventStore implements EventStoreDB { return snapshot; } + @NewSpan private T getAggregate(final String aggregateId, final Class aggregateType) { try { return aggregateType.getConstructor(String.class).newInstance(aggregateId); @@ -127,6 +134,7 @@ public class EventStore implements EventStoreDB { } } + @NewSpan private T getSnapshotFromClass(Optional snapshot, String aggregateId, Class aggregateType) { if (snapshot.isEmpty()) { final var defaultSnapshot = EventSourcingUtils.snapshotFromAggregate(getAggregate(aggregateId, aggregateType)); @@ -137,6 +145,7 @@ public class EventStore implements EventStoreDB { @Override @Transactional(readOnly = true) + @NewSpan public T load(String aggregateId, Class aggregateType) { final Optional snapshot = this.loadSnapshot(aggregateId); @@ -156,6 +165,7 @@ public class EventStore implements EventStoreDB { } @Override + @NewSpan public Boolean exists(String aggregateId) { try { final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of("aggregate_id", aggregateId), String.class); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index bfeb640..78a530d 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -39,4 +39,8 @@ spring.data.mongodb.database=microservices springdoc.swagger-ui.path=/swagger-ui.html -management.endpoints.web.exposure.include=health,prometheus,info \ No newline at end of file +management.endpoints.web.exposure.include=health,prometheus,info + +spring.sleuth.propagation.type=w3c,b3 +spring.sleuth.opentracing.enabled=true +spring.zipkin.base-url=http://localhost:9411 \ No newline at end of file