feature: add jaeger opentracing improvements

This commit is contained in:
Alexander
2022-04-14 15:30:08 +03:00
parent eb019a12f0
commit d56dfa6208
5 changed files with 31 additions and 23 deletions

View File

@@ -6,18 +6,19 @@ import com.eventsourcing.es.EventStoreDB;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.cloud.sleuth.annotation.SpanTag;
import org.springframework.stereotype.Service;
@RequiredArgsConstructor
@Slf4j
@Service
public class BankAccountCommandHandler implements BankAccountCommandService{
public class BankAccountCommandHandler implements BankAccountCommandService {
private final EventStoreDB eventStoreDB;
@Override
@NewSpan
public String handle(CreateBankAccountCommand command) {
public String handle(@SpanTag("command") CreateBankAccountCommand command) {
final var aggregate = new BankAccountAggregate(command.aggregateID());
aggregate.createBankAccount(command.email(), command.address(), command.userName());
eventStoreDB.save(aggregate);
@@ -28,7 +29,7 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
@Override
@NewSpan
public void handle(ChangeEmailCommand command) {
public void handle(@SpanTag("command") ChangeEmailCommand command) {
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
aggregate.changeEmail(command.newEmail());
eventStoreDB.save(aggregate);
@@ -37,7 +38,7 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
@Override
@NewSpan
public void handle(ChangeAddressCommand command) {
public void handle(@SpanTag("command") ChangeAddressCommand command) {
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
aggregate.changeAddress(command.newAddress());
eventStoreDB.save(aggregate);
@@ -46,7 +47,7 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
@Override
@NewSpan
public void handle(DepositAmountCommand command) {
public void handle(@SpanTag("command") DepositAmountCommand command) {
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
aggregate.depositBalance(command.amount());
eventStoreDB.save(aggregate);

View File

@@ -16,6 +16,7 @@ import com.eventsourcing.mappers.BankAccountMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.cloud.sleuth.annotation.SpanTag;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
import org.springframework.kafka.support.Acknowledgment;
@@ -56,7 +57,7 @@ public class BankAccountMongoProjection implements Projection {
}
@NewSpan
private void processEvents(List<Event> events) {
private void processEvents(@SpanTag("events") List<Event> events) {
if (events.isEmpty()) return;
try {
@@ -72,7 +73,7 @@ public class BankAccountMongoProjection implements Projection {
@Override
@NewSpan
public void when(Event event) {
public void when(@SpanTag("event") Event event) {
final var aggregateId = event.getAggregateId();
log.info("(when) >>>>> aggregateId: {}", aggregateId);
@@ -91,7 +92,7 @@ public class BankAccountMongoProjection implements Projection {
@NewSpan
private void handle(BankAccountCreatedEvent event) {
private void handle(@SpanTag("event") BankAccountCreatedEvent event) {
log.info("(when) BankAccountCreatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
final var document = BankAccountDocument.builder()
@@ -107,7 +108,7 @@ public class BankAccountMongoProjection implements Projection {
}
@NewSpan
private void handle(EmailChangedEvent event) {
private void handle(@SpanTag("event") EmailChangedEvent event) {
log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId());
Optional<BankAccountDocument> documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
if (documentOptional.isEmpty())
@@ -119,7 +120,7 @@ public class BankAccountMongoProjection implements Projection {
}
@NewSpan
private void handle(AddressUpdatedEvent event) {
private void handle(@SpanTag("event") AddressUpdatedEvent event) {
log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
Optional<BankAccountDocument> documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
if (documentOptional.isEmpty())
@@ -131,7 +132,7 @@ public class BankAccountMongoProjection implements Projection {
}
@NewSpan
private void handle(BalanceDepositedEvent event) {
private void handle(@SpanTag("event") BalanceDepositedEvent event) {
log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId());
Optional<BankAccountDocument> documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
if (documentOptional.isEmpty())

View File

@@ -9,6 +9,7 @@ import com.eventsourcing.mappers.BankAccountMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.cloud.sleuth.annotation.SpanTag;
import org.springframework.stereotype.Service;
import java.util.Optional;
@@ -24,7 +25,7 @@ public class BankAccountQueryHandler implements BankAccountQueryService {
@Override
@NewSpan
public BankAccountResponseDTO handle(GetBankAccountByIDQuery query) {
public BankAccountResponseDTO handle(@SpanTag("query") GetBankAccountByIDQuery query) {
Optional<BankAccountDocument> optionalDocument = mongoRepository.findByAggregateId(query.aggregateID());
if (optionalDocument.isPresent()) {
return BankAccountMapper.bankAccountResponseDTOFromDocument(optionalDocument.get());

View File

@@ -5,6 +5,7 @@ import com.eventsourcing.es.exceptions.AggregateNotFoundException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.cloud.sleuth.annotation.SpanTag;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Repository;
@@ -30,7 +31,7 @@ public class EventStore implements EventStoreDB {
@Override
@NewSpan
public void saveEvents(List<Event> events) {
public void saveEvents(@SpanTag("events") List<Event> events) {
if (events.isEmpty()) return;
final List<Event> changes = new ArrayList<>(events);
@@ -49,7 +50,7 @@ public class EventStore implements EventStoreDB {
@Override
@NewSpan
public List<Event> loadEvents(String aggregateId, long version) {
public List<Event> loadEvents(@SpanTag("aggregateId") String aggregateId, @SpanTag("version") long version) {
final List<Event> events = jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of("aggregate_id", aggregateId, "version", version),
(rs, rowNum) -> Event.builder()
.aggregateId(rs.getString("aggregate_id"))
@@ -65,7 +66,8 @@ public class EventStore implements EventStoreDB {
return events;
}
private <T extends AggregateRoot> void saveSnapshot(T aggregate) {
@NewSpan
private <T extends AggregateRoot> void saveSnapshot(@SpanTag("aggregate") T aggregate) {
aggregate.toSnapshot();
final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate);
@@ -82,7 +84,7 @@ public class EventStore implements EventStoreDB {
@Override
@Transactional
@NewSpan
public <T extends AggregateRoot> void save(T aggregate) {
public <T extends AggregateRoot> void save(@SpanTag("aggregate") T aggregate) {
final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());
if (aggregate.getVersion() > 1) {
@@ -100,7 +102,7 @@ public class EventStore implements EventStoreDB {
}
@NewSpan
private void handleConcurrency(String aggregateId) {
private void handleConcurrency(@SpanTag("aggregateId") String aggregateId) {
try {
String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of("aggregate_id", aggregateId), String.class);
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateID);
@@ -111,7 +113,7 @@ public class EventStore implements EventStoreDB {
}
@NewSpan
private Optional<Snapshot> loadSnapshot(String aggregateId) {
private Optional<Snapshot> loadSnapshot(@SpanTag("aggregateId") String aggregateId) {
final Optional<Snapshot> snapshot = jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of("aggregate_id", aggregateId), (rs, rowNum) -> Snapshot.builder()
.aggregateId(rs.getString("aggregate_id"))
.aggregateType(rs.getString("aggregate_type"))
@@ -126,7 +128,7 @@ public class EventStore implements EventStoreDB {
}
@NewSpan
private <T extends AggregateRoot> T getAggregate(final String aggregateId, final Class<T> aggregateType) {
private <T extends AggregateRoot> T getAggregate(@SpanTag("aggregateId") final String aggregateId, @SpanTag("aggregateType") final Class<T> aggregateType) {
try {
return aggregateType.getConstructor(String.class).newInstance(aggregateId);
} catch (Exception ex) {
@@ -135,7 +137,7 @@ public class EventStore implements EventStoreDB {
}
@NewSpan
private <T extends AggregateRoot> T getSnapshotFromClass(Optional<Snapshot> snapshot, String aggregateId, Class<T> aggregateType) {
private <T extends AggregateRoot> T getSnapshotFromClass(@SpanTag("snapshot") Optional<Snapshot> snapshot, @SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
if (snapshot.isEmpty()) {
final var defaultSnapshot = EventSourcingUtils.snapshotFromAggregate(getAggregate(aggregateId, aggregateType));
return EventSourcingUtils.aggregateFromSnapshot(defaultSnapshot, aggregateType);
@@ -146,7 +148,7 @@ public class EventStore implements EventStoreDB {
@Override
@Transactional(readOnly = true)
@NewSpan
public <T extends AggregateRoot> T load(String aggregateId, Class<T> aggregateType) {
public <T extends AggregateRoot> T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
final Optional<Snapshot> snapshot = this.loadSnapshot(aggregateId);
@@ -166,7 +168,7 @@ public class EventStore implements EventStoreDB {
@Override
@NewSpan
public Boolean exists(String aggregateId) {
public Boolean exists(@SpanTag("aggregateId") String aggregateId) {
try {
final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of("aggregate_id", aggregateId), String.class);
log.info("aggregate exists id: {}", id);

View File

@@ -5,6 +5,8 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.cloud.sleuth.annotation.SpanTag;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@@ -21,7 +23,8 @@ public class KafkaEventBus implements EventBus {
private String bankAccountTopicName;
@Override
public void publish(List<Event> events) {
@NewSpan
public void publish(@SpanTag("events") List<Event> events) {
final byte[] eventsBytes = SerializerUtils.serializeToJsonBytes(events.toArray(new Event[]{}));
final ProducerRecord<String, byte[]> record = new ProducerRecord<>(bankAccountTopicName, eventsBytes);