feature: add jaeger opentracing
This commit is contained in:
10
pom.xml
10
pom.xml
@@ -17,6 +17,16 @@
|
||||
<java.version>17</java.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-sleuth-zipkin</artifactId>
|
||||
<version>3.1.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-sleuth</artifactId>
|
||||
<version>3.1.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.micrometer</groupId>
|
||||
<artifactId>micrometer-registry-prometheus</artifactId>
|
||||
|
||||
@@ -5,6 +5,7 @@ import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
|
||||
import com.eventsourcing.es.EventStoreDB;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cloud.sleuth.annotation.NewSpan;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
@@ -15,6 +16,7 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
|
||||
private final EventStoreDB eventStoreDB;
|
||||
|
||||
@Override
|
||||
@NewSpan
|
||||
public String handle(CreateBankAccountCommand command) {
|
||||
final var aggregate = new BankAccountAggregate(command.aggregateID());
|
||||
aggregate.createBankAccount(command.email(), command.address(), command.userName());
|
||||
@@ -25,6 +27,7 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
|
||||
}
|
||||
|
||||
@Override
|
||||
@NewSpan
|
||||
public void handle(ChangeEmailCommand command) {
|
||||
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
|
||||
aggregate.changeEmail(command.newEmail());
|
||||
@@ -33,6 +36,7 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
|
||||
}
|
||||
|
||||
@Override
|
||||
@NewSpan
|
||||
public void handle(ChangeAddressCommand command) {
|
||||
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
|
||||
aggregate.changeAddress(command.newAddress());
|
||||
@@ -41,6 +45,7 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
|
||||
}
|
||||
|
||||
@Override
|
||||
@NewSpan
|
||||
public void handle(DepositAmountCommand command) {
|
||||
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
|
||||
aggregate.depositBalance(command.amount());
|
||||
|
||||
@@ -15,6 +15,7 @@ import com.eventsourcing.es.SerializerUtils;
|
||||
import com.eventsourcing.mappers.BankAccountMapper;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cloud.sleuth.annotation.NewSpan;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
@@ -54,6 +55,7 @@ public class BankAccountMongoProjection implements Projection {
|
||||
}
|
||||
}
|
||||
|
||||
@NewSpan
|
||||
private void processEvents(List<Event> events) {
|
||||
if (events.isEmpty()) return;
|
||||
|
||||
@@ -69,6 +71,7 @@ public class BankAccountMongoProjection implements Projection {
|
||||
}
|
||||
|
||||
@Override
|
||||
@NewSpan
|
||||
public void when(Event event) {
|
||||
final var aggregateId = event.getAggregateId();
|
||||
log.info("(when) >>>>> aggregateId: {}", aggregateId);
|
||||
@@ -87,6 +90,7 @@ public class BankAccountMongoProjection implements Projection {
|
||||
}
|
||||
|
||||
|
||||
@NewSpan
|
||||
private void handle(BankAccountCreatedEvent event) {
|
||||
log.info("(when) BankAccountCreatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
|
||||
|
||||
@@ -102,6 +106,7 @@ public class BankAccountMongoProjection implements Projection {
|
||||
log.info("(BankAccountCreatedEvent) insert: {}", insert);
|
||||
}
|
||||
|
||||
@NewSpan
|
||||
private void handle(EmailChangedEvent event) {
|
||||
log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId());
|
||||
Optional<BankAccountDocument> documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
|
||||
@@ -113,6 +118,7 @@ public class BankAccountMongoProjection implements Projection {
|
||||
mongoRepository.save(document);
|
||||
}
|
||||
|
||||
@NewSpan
|
||||
private void handle(AddressUpdatedEvent event) {
|
||||
log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
|
||||
Optional<BankAccountDocument> documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
|
||||
@@ -124,6 +130,7 @@ public class BankAccountMongoProjection implements Projection {
|
||||
mongoRepository.save(document);
|
||||
}
|
||||
|
||||
@NewSpan
|
||||
private void handle(BalanceDepositedEvent event) {
|
||||
log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId());
|
||||
Optional<BankAccountDocument> documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
|
||||
|
||||
@@ -8,6 +8,7 @@ import com.eventsourcing.es.EventStoreDB;
|
||||
import com.eventsourcing.mappers.BankAccountMapper;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cloud.sleuth.annotation.NewSpan;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Optional;
|
||||
@@ -22,6 +23,7 @@ public class BankAccountQueryHandler implements BankAccountQueryService {
|
||||
private final BankAccountMongoRepository mongoRepository;
|
||||
|
||||
@Override
|
||||
@NewSpan
|
||||
public BankAccountResponseDTO handle(GetBankAccountByIDQuery query) {
|
||||
Optional<BankAccountDocument> optionalDocument = mongoRepository.findByAggregateId(query.aggregateID());
|
||||
if (optionalDocument.isPresent()) {
|
||||
|
||||
@@ -4,6 +4,7 @@ package com.eventsourcing.es;
|
||||
import com.eventsourcing.es.exceptions.AggregateNotFoundException;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cloud.sleuth.annotation.NewSpan;
|
||||
import org.springframework.dao.EmptyResultDataAccessException;
|
||||
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
@@ -28,6 +29,7 @@ public class EventStore implements EventStoreDB {
|
||||
private final EventBus eventBus;
|
||||
|
||||
@Override
|
||||
@NewSpan
|
||||
public void saveEvents(List<Event> events) {
|
||||
if (events.isEmpty()) return;
|
||||
|
||||
@@ -46,6 +48,7 @@ public class EventStore implements EventStoreDB {
|
||||
}
|
||||
|
||||
@Override
|
||||
@NewSpan
|
||||
public List<Event> loadEvents(String aggregateId, long version) {
|
||||
final List<Event> events = jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of("aggregate_id", aggregateId, "version", version),
|
||||
(rs, rowNum) -> Event.builder()
|
||||
@@ -78,6 +81,7 @@ public class EventStore implements EventStoreDB {
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
@NewSpan
|
||||
public <T extends AggregateRoot> void save(T aggregate) {
|
||||
final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());
|
||||
|
||||
@@ -95,6 +99,7 @@ public class EventStore implements EventStoreDB {
|
||||
log.info("(save) saved aggregate: {}", aggregate);
|
||||
}
|
||||
|
||||
@NewSpan
|
||||
private void handleConcurrency(String aggregateId) {
|
||||
try {
|
||||
String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of("aggregate_id", aggregateId), String.class);
|
||||
@@ -105,6 +110,7 @@ public class EventStore implements EventStoreDB {
|
||||
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateId);
|
||||
}
|
||||
|
||||
@NewSpan
|
||||
private Optional<Snapshot> loadSnapshot(String aggregateId) {
|
||||
final Optional<Snapshot> snapshot = jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of("aggregate_id", aggregateId), (rs, rowNum) -> Snapshot.builder()
|
||||
.aggregateId(rs.getString("aggregate_id"))
|
||||
@@ -119,6 +125,7 @@ public class EventStore implements EventStoreDB {
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
@NewSpan
|
||||
private <T extends AggregateRoot> T getAggregate(final String aggregateId, final Class<T> aggregateType) {
|
||||
try {
|
||||
return aggregateType.getConstructor(String.class).newInstance(aggregateId);
|
||||
@@ -127,6 +134,7 @@ public class EventStore implements EventStoreDB {
|
||||
}
|
||||
}
|
||||
|
||||
@NewSpan
|
||||
private <T extends AggregateRoot> T getSnapshotFromClass(Optional<Snapshot> snapshot, String aggregateId, Class<T> aggregateType) {
|
||||
if (snapshot.isEmpty()) {
|
||||
final var defaultSnapshot = EventSourcingUtils.snapshotFromAggregate(getAggregate(aggregateId, aggregateType));
|
||||
@@ -137,6 +145,7 @@ public class EventStore implements EventStoreDB {
|
||||
|
||||
@Override
|
||||
@Transactional(readOnly = true)
|
||||
@NewSpan
|
||||
public <T extends AggregateRoot> T load(String aggregateId, Class<T> aggregateType) {
|
||||
|
||||
final Optional<Snapshot> snapshot = this.loadSnapshot(aggregateId);
|
||||
@@ -156,6 +165,7 @@ public class EventStore implements EventStoreDB {
|
||||
}
|
||||
|
||||
@Override
|
||||
@NewSpan
|
||||
public Boolean exists(String aggregateId) {
|
||||
try {
|
||||
final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of("aggregate_id", aggregateId), String.class);
|
||||
|
||||
@@ -39,4 +39,8 @@ spring.data.mongodb.database=microservices
|
||||
|
||||
springdoc.swagger-ui.path=/swagger-ui.html
|
||||
|
||||
management.endpoints.web.exposure.include=health,prometheus,info
|
||||
management.endpoints.web.exposure.include=health,prometheus,info
|
||||
|
||||
spring.sleuth.propagation.type=w3c,b3
|
||||
spring.sleuth.opentracing.enabled=true
|
||||
spring.zipkin.base-url=http://localhost:9411
|
||||
Reference in New Issue
Block a user