diff --git a/src/main/java/com/eventsourcing/bankAccount/commands/BankAccountCommandHandler.java b/src/main/java/com/eventsourcing/bankAccount/commands/BankAccountCommandHandler.java index 76464a9..a9aa006 100644 --- a/src/main/java/com/eventsourcing/bankAccount/commands/BankAccountCommandHandler.java +++ b/src/main/java/com/eventsourcing/bankAccount/commands/BankAccountCommandHandler.java @@ -6,18 +6,19 @@ import com.eventsourcing.es.EventStoreDB; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.sleuth.annotation.NewSpan; +import org.springframework.cloud.sleuth.annotation.SpanTag; import org.springframework.stereotype.Service; @RequiredArgsConstructor @Slf4j @Service -public class BankAccountCommandHandler implements BankAccountCommandService{ +public class BankAccountCommandHandler implements BankAccountCommandService { private final EventStoreDB eventStoreDB; @Override @NewSpan - public String handle(CreateBankAccountCommand command) { + public String handle(@SpanTag("command") CreateBankAccountCommand command) { final var aggregate = new BankAccountAggregate(command.aggregateID()); aggregate.createBankAccount(command.email(), command.address(), command.userName()); eventStoreDB.save(aggregate); @@ -28,7 +29,7 @@ public class BankAccountCommandHandler implements BankAccountCommandService{ @Override @NewSpan - public void handle(ChangeEmailCommand command) { + public void handle(@SpanTag("command") ChangeEmailCommand command) { final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class); aggregate.changeEmail(command.newEmail()); eventStoreDB.save(aggregate); @@ -37,7 +38,7 @@ public class BankAccountCommandHandler implements BankAccountCommandService{ @Override @NewSpan - public void handle(ChangeAddressCommand command) { + public void handle(@SpanTag("command") ChangeAddressCommand command) { final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class); aggregate.changeAddress(command.newAddress()); eventStoreDB.save(aggregate); @@ -46,7 +47,7 @@ public class BankAccountCommandHandler implements BankAccountCommandService{ @Override @NewSpan - public void handle(DepositAmountCommand command) { + public void handle(@SpanTag("command") DepositAmountCommand command) { final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class); aggregate.depositBalance(command.amount()); eventStoreDB.save(aggregate); diff --git a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java index 6eb4c1f..6d9c09c 100644 --- a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java +++ b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java @@ -16,6 +16,7 @@ import com.eventsourcing.mappers.BankAccountMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.sleuth.annotation.NewSpan; +import org.springframework.cloud.sleuth.annotation.SpanTag; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata; import org.springframework.kafka.support.Acknowledgment; @@ -56,7 +57,7 @@ public class BankAccountMongoProjection implements Projection { } @NewSpan - private void processEvents(List events) { + private void processEvents(@SpanTag("events") List events) { if (events.isEmpty()) return; try { @@ -72,7 +73,7 @@ public class BankAccountMongoProjection implements Projection { @Override @NewSpan - public void when(Event event) { + public void when(@SpanTag("event") Event event) { final var aggregateId = event.getAggregateId(); log.info("(when) >>>>> aggregateId: {}", aggregateId); @@ -91,7 +92,7 @@ public class BankAccountMongoProjection implements Projection { @NewSpan - private void handle(BankAccountCreatedEvent event) { + private void handle(@SpanTag("event") BankAccountCreatedEvent event) { log.info("(when) BankAccountCreatedEvent: {}, aggregateID: {}", event, event.getAggregateId()); final var document = BankAccountDocument.builder() @@ -107,7 +108,7 @@ public class BankAccountMongoProjection implements Projection { } @NewSpan - private void handle(EmailChangedEvent event) { + private void handle(@SpanTag("event") EmailChangedEvent event) { log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId()); Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); if (documentOptional.isEmpty()) @@ -119,7 +120,7 @@ public class BankAccountMongoProjection implements Projection { } @NewSpan - private void handle(AddressUpdatedEvent event) { + private void handle(@SpanTag("event") AddressUpdatedEvent event) { log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId()); Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); if (documentOptional.isEmpty()) @@ -131,7 +132,7 @@ public class BankAccountMongoProjection implements Projection { } @NewSpan - private void handle(BalanceDepositedEvent event) { + private void handle(@SpanTag("event") BalanceDepositedEvent event) { log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId()); Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); if (documentOptional.isEmpty()) diff --git a/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java b/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java index 8fd1328..25f509f 100644 --- a/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java +++ b/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java @@ -9,6 +9,7 @@ import com.eventsourcing.mappers.BankAccountMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.sleuth.annotation.NewSpan; +import org.springframework.cloud.sleuth.annotation.SpanTag; import org.springframework.stereotype.Service; import java.util.Optional; @@ -24,7 +25,7 @@ public class BankAccountQueryHandler implements BankAccountQueryService { @Override @NewSpan - public BankAccountResponseDTO handle(GetBankAccountByIDQuery query) { + public BankAccountResponseDTO handle(@SpanTag("query") GetBankAccountByIDQuery query) { Optional optionalDocument = mongoRepository.findByAggregateId(query.aggregateID()); if (optionalDocument.isPresent()) { return BankAccountMapper.bankAccountResponseDTOFromDocument(optionalDocument.get()); diff --git a/src/main/java/com/eventsourcing/es/EventStore.java b/src/main/java/com/eventsourcing/es/EventStore.java index 9d9afab..bbbca34 100644 --- a/src/main/java/com/eventsourcing/es/EventStore.java +++ b/src/main/java/com/eventsourcing/es/EventStore.java @@ -5,6 +5,7 @@ import com.eventsourcing.es.exceptions.AggregateNotFoundException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.sleuth.annotation.NewSpan; +import org.springframework.cloud.sleuth.annotation.SpanTag; import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.stereotype.Repository; @@ -30,7 +31,7 @@ public class EventStore implements EventStoreDB { @Override @NewSpan - public void saveEvents(List events) { + public void saveEvents(@SpanTag("events") List events) { if (events.isEmpty()) return; final List changes = new ArrayList<>(events); @@ -49,7 +50,7 @@ public class EventStore implements EventStoreDB { @Override @NewSpan - public List loadEvents(String aggregateId, long version) { + public List loadEvents(@SpanTag("aggregateId") String aggregateId, @SpanTag("version") long version) { final List events = jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of("aggregate_id", aggregateId, "version", version), (rs, rowNum) -> Event.builder() .aggregateId(rs.getString("aggregate_id")) @@ -65,7 +66,8 @@ public class EventStore implements EventStoreDB { return events; } - private void saveSnapshot(T aggregate) { + @NewSpan + private void saveSnapshot(@SpanTag("aggregate") T aggregate) { aggregate.toSnapshot(); final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate); @@ -82,7 +84,7 @@ public class EventStore implements EventStoreDB { @Override @Transactional @NewSpan - public void save(T aggregate) { + public void save(@SpanTag("aggregate") T aggregate) { final List aggregateEvents = new ArrayList<>(aggregate.getChanges()); if (aggregate.getVersion() > 1) { @@ -100,7 +102,7 @@ public class EventStore implements EventStoreDB { } @NewSpan - private void handleConcurrency(String aggregateId) { + private void handleConcurrency(@SpanTag("aggregateId") String aggregateId) { try { String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of("aggregate_id", aggregateId), String.class); log.info("(handleConcurrency) aggregateID for lock: {}", aggregateID); @@ -111,7 +113,7 @@ public class EventStore implements EventStoreDB { } @NewSpan - private Optional loadSnapshot(String aggregateId) { + private Optional loadSnapshot(@SpanTag("aggregateId") String aggregateId) { final Optional snapshot = jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of("aggregate_id", aggregateId), (rs, rowNum) -> Snapshot.builder() .aggregateId(rs.getString("aggregate_id")) .aggregateType(rs.getString("aggregate_type")) @@ -126,7 +128,7 @@ public class EventStore implements EventStoreDB { } @NewSpan - private T getAggregate(final String aggregateId, final Class aggregateType) { + private T getAggregate(@SpanTag("aggregateId") final String aggregateId, @SpanTag("aggregateType") final Class aggregateType) { try { return aggregateType.getConstructor(String.class).newInstance(aggregateId); } catch (Exception ex) { @@ -135,7 +137,7 @@ public class EventStore implements EventStoreDB { } @NewSpan - private T getSnapshotFromClass(Optional snapshot, String aggregateId, Class aggregateType) { + private T getSnapshotFromClass(@SpanTag("snapshot") Optional snapshot, @SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class aggregateType) { if (snapshot.isEmpty()) { final var defaultSnapshot = EventSourcingUtils.snapshotFromAggregate(getAggregate(aggregateId, aggregateType)); return EventSourcingUtils.aggregateFromSnapshot(defaultSnapshot, aggregateType); @@ -146,7 +148,7 @@ public class EventStore implements EventStoreDB { @Override @Transactional(readOnly = true) @NewSpan - public T load(String aggregateId, Class aggregateType) { + public T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class aggregateType) { final Optional snapshot = this.loadSnapshot(aggregateId); @@ -166,7 +168,7 @@ public class EventStore implements EventStoreDB { @Override @NewSpan - public Boolean exists(String aggregateId) { + public Boolean exists(@SpanTag("aggregateId") String aggregateId) { try { final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of("aggregate_id", aggregateId), String.class); log.info("aggregate exists id: {}", id); diff --git a/src/main/java/com/eventsourcing/es/KafkaEventBus.java b/src/main/java/com/eventsourcing/es/KafkaEventBus.java index 68b0ce4..9a7373d 100644 --- a/src/main/java/com/eventsourcing/es/KafkaEventBus.java +++ b/src/main/java/com/eventsourcing/es/KafkaEventBus.java @@ -5,6 +5,8 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.sleuth.annotation.NewSpan; +import org.springframework.cloud.sleuth.annotation.SpanTag; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @@ -21,7 +23,8 @@ public class KafkaEventBus implements EventBus { private String bankAccountTopicName; @Override - public void publish(List events) { + @NewSpan + public void publish(@SpanTag("events") List events) { final byte[] eventsBytes = SerializerUtils.serializeToJsonBytes(events.toArray(new Event[]{})); final ProducerRecord record = new ProducerRecord<>(bankAccountTopicName, eventsBytes);