3 Commits

Author SHA1 Message Date
Alexander
5275894580 feature: add jaeger opentracing improvements 2022-04-14 15:30:08 +03:00
Alexander
ea25e8e8bc feature: add jaeger opentracing 2022-04-14 15:18:33 +03:00
Alexander
501b5a9cec feature: add prometheus monitoring 2022-04-14 15:04:44 +03:00
12 changed files with 28 additions and 122 deletions

View File

@@ -17,11 +17,6 @@
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth-zipkin</artifactId>

View File

@@ -3,8 +3,6 @@ package com.eventsourcing.bankAccount.commands;
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
import com.eventsourcing.es.EventStoreDB;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.retry.annotation.Retry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.annotation.NewSpan;
@@ -17,12 +15,9 @@ import org.springframework.stereotype.Service;
public class BankAccountCommandHandler implements BankAccountCommandService {
private final EventStoreDB eventStoreDB;
private static final String SERVICE_NAME = "microservice";
@Override
@NewSpan
@Retry(name = SERVICE_NAME)
@CircuitBreaker(name = SERVICE_NAME)
public String handle(@SpanTag("command") CreateBankAccountCommand command) {
final var aggregate = new BankAccountAggregate(command.aggregateID());
aggregate.createBankAccount(command.email(), command.address(), command.userName());
@@ -34,8 +29,6 @@ public class BankAccountCommandHandler implements BankAccountCommandService {
@Override
@NewSpan
@Retry(name = SERVICE_NAME)
@CircuitBreaker(name = SERVICE_NAME)
public void handle(@SpanTag("command") ChangeEmailCommand command) {
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
aggregate.changeEmail(command.newEmail());
@@ -45,8 +38,6 @@ public class BankAccountCommandHandler implements BankAccountCommandService {
@Override
@NewSpan
@Retry(name = SERVICE_NAME)
@CircuitBreaker(name = SERVICE_NAME)
public void handle(@SpanTag("command") ChangeAddressCommand command) {
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
aggregate.changeAddress(command.newAddress());
@@ -56,8 +47,6 @@ public class BankAccountCommandHandler implements BankAccountCommandService {
@Override
@NewSpan
@Retry(name = SERVICE_NAME)
@CircuitBreaker(name = SERVICE_NAME)
public void handle(@SpanTag("command") DepositAmountCommand command) {
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
aggregate.depositBalance(command.amount());

View File

@@ -4,11 +4,9 @@ package com.eventsourcing.bankAccount.delivery;
import com.eventsourcing.bankAccount.commands.*;
import com.eventsourcing.bankAccount.dto.*;
import com.eventsourcing.bankAccount.queries.BankAccountQueryService;
import com.eventsourcing.bankAccount.queries.FindAllOrderByBalance;
import com.eventsourcing.bankAccount.queries.GetBankAccountByIDQuery;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Page;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@@ -27,7 +25,9 @@ public class BankAccountController {
@GetMapping("{aggregateId}")
public ResponseEntity<BankAccountResponseDTO> getBankAccount(@PathVariable String aggregateId) {
final var result = queryService.handle(new GetBankAccountByIDQuery(aggregateId));
final var query = new GetBankAccountByIDQuery(aggregateId);
log.info("GET bank account query: {}", query);
final var result = queryService.handle(query);
log.info("GET bank account result: {}", result);
return ResponseEntity.ok(result);
}
@@ -35,35 +35,33 @@ public class BankAccountController {
@PostMapping
public ResponseEntity<String> createBankAccount(@Valid @RequestBody CreateBankAccountRequestDTO dto) {
final var aggregateID = UUID.randomUUID().toString();
final var id = commandService.handle(new CreateBankAccountCommand(aggregateID, dto.email(), dto.userName(), dto.address()));
final var command = new CreateBankAccountCommand(aggregateID, dto.email(), dto.userName(), dto.address());
final var id = commandService.handle(command);
log.info("CREATE bank account id: {}", id);
return ResponseEntity.status(HttpStatus.CREATED).body(id);
}
@PostMapping(path = "/deposit/{aggregateId}")
public ResponseEntity<Void> depositAmount(@Valid @RequestBody DepositAmountRequestDTO dto, @PathVariable String aggregateId) {
commandService.handle(new DepositAmountCommand(aggregateId, dto.amount()));
final var command = new DepositAmountCommand(aggregateId, dto.amount());
commandService.handle(command);
log.info("DepositAmountCommand command: {}", command);
return ResponseEntity.ok().build();
}
@PostMapping(path = "/email/{aggregateId}")
public ResponseEntity<Void> changeEmail(@Valid @RequestBody ChangeEmailRequestDTO dto, @PathVariable String aggregateId) {
commandService.handle(new ChangeEmailCommand(aggregateId, dto.newEmail()));
final var command = new ChangeEmailCommand(aggregateId, dto.newEmail());
commandService.handle(command);
log.info("ChangeEmailCommand command: {}", command);
return ResponseEntity.ok().build();
}
@PostMapping(path = "/address/{aggregateId}")
public ResponseEntity<Void> changeAddress(@Valid @RequestBody ChangeAddressRequestDTO dto, @PathVariable String aggregateId) {
commandService.handle(new ChangeAddressCommand(aggregateId, dto.newAddress()));
final var command = new ChangeAddressCommand(aggregateId, dto.newAddress());
commandService.handle(command);
log.info("changeAddress command: {}", command);
return ResponseEntity.ok().build();
}
@GetMapping("/balance")
public ResponseEntity<Page<BankAccountResponseDTO>> getAllOrderByBalance(@RequestParam(name = "page", defaultValue = "0") Integer page,
@RequestParam(name = "size", defaultValue = "10") Integer size) {
final var result = queryService.handle(new FindAllOrderByBalance(page, size));
log.info("GET all by balance result: {}", result);
return ResponseEntity.ok(result);
}
}

View File

@@ -4,8 +4,6 @@ import com.eventsourcing.bankAccount.events.AddressUpdatedEvent;
import com.eventsourcing.bankAccount.events.BalanceDepositedEvent;
import com.eventsourcing.bankAccount.events.BankAccountCreatedEvent;
import com.eventsourcing.bankAccount.events.EmailChangedEvent;
import com.eventsourcing.bankAccount.exceptions.InvalidAddressException;
import com.eventsourcing.bankAccount.exceptions.InvalidEmailException;
import com.eventsourcing.es.AggregateRoot;
import com.eventsourcing.es.Event;
import com.eventsourcing.es.SerializerUtils;
@@ -59,13 +57,13 @@ public class BankAccountAggregate extends AggregateRoot {
private void handle(final EmailChangedEvent event) {
Objects.requireNonNull(event.getNewEmail());
if (event.getNewEmail().isBlank()) throw new InvalidEmailException();
if (event.getNewEmail().isBlank()) throw new RuntimeException("invalid email address");
this.email = event.getNewEmail();
}
private void handle(final AddressUpdatedEvent event) {
Objects.requireNonNull(event.getNewAddress());
if (event.getNewAddress().isBlank()) throw new InvalidAddressException();
if (event.getNewAddress().isBlank()) throw new RuntimeException("invalid address");
this.address = event.getNewAddress();
}

View File

@@ -1,11 +0,0 @@
package com.eventsourcing.bankAccount.exceptions;
public class InvalidAddressException extends RuntimeException {
public InvalidAddressException() {
super();
}
public InvalidAddressException(String address) {
super("invalid address: " + address);
}
}

View File

@@ -1,11 +0,0 @@
package com.eventsourcing.bankAccount.exceptions;
public class InvalidEmailException extends RuntimeException {
public InvalidEmailException() {
super();
}
public InvalidEmailException(String email) {
super("invalid email address: " + email);
}
}

View File

@@ -13,8 +13,6 @@ import com.eventsourcing.es.EventStoreDB;
import com.eventsourcing.es.Projection;
import com.eventsourcing.es.SerializerUtils;
import com.eventsourcing.mappers.BankAccountMapper;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.retry.annotation.Retry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.annotation.NewSpan;
@@ -28,6 +26,7 @@ import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@Service
@Slf4j
@@ -36,23 +35,24 @@ public class BankAccountMongoProjection implements Projection {
private final BankAccountMongoRepository mongoRepository;
private final EventStoreDB eventStoreDB;
private static final String SERVICE_NAME = "microservice";
@KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"},
groupId = "${microservice.kafka.groupId}",
concurrency = "${microservice.kafka.default-concurrency}")
public void bankAccountMongoProjectionListener(@Payload byte[] data, ConsumerRecordMetadata meta, Acknowledgment ack) {
log.info("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}, data: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp(), new String(data));
log.info("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp());
log.info("(BankAccountMongoProjection) data: {}", new String(data));
try {
final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data);
this.processEvents(Arrays.stream(events).toList());
ack.acknowledge();
log.info("ack events: {}", Arrays.toString(events));
} catch (Exception ex) {
} catch (Exception e) {
ack.nack(100);
log.error("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp(), ex);
log.error("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp());
log.error("bankAccountMongoProjectionListener: {}", e.getMessage());
}
}
@@ -73,8 +73,6 @@ public class BankAccountMongoProjection implements Projection {
@Override
@NewSpan
@Retry(name = SERVICE_NAME)
@CircuitBreaker(name = SERVICE_NAME)
public void when(@SpanTag("event") Event event) {
final var aggregateId = event.getAggregateId();
log.info("(when) >>>>> aggregateId: {}", aggregateId);
@@ -112,7 +110,7 @@ public class BankAccountMongoProjection implements Projection {
@NewSpan
private void handle(@SpanTag("event") EmailChangedEvent event) {
log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId());
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
Optional<BankAccountDocument> documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
if (documentOptional.isEmpty())
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
@@ -124,7 +122,7 @@ public class BankAccountMongoProjection implements Projection {
@NewSpan
private void handle(@SpanTag("event") AddressUpdatedEvent event) {
log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
Optional<BankAccountDocument> documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
if (documentOptional.isEmpty())
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
@@ -136,7 +134,7 @@ public class BankAccountMongoProjection implements Projection {
@NewSpan
private void handle(@SpanTag("event") BalanceDepositedEvent event) {
log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId());
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
Optional<BankAccountDocument> documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
if (documentOptional.isEmpty())
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());

View File

@@ -6,15 +6,10 @@ import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO;
import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository;
import com.eventsourcing.es.EventStoreDB;
import com.eventsourcing.mappers.BankAccountMapper;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.retry.annotation.Retry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.cloud.sleuth.annotation.SpanTag;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Service;
import java.util.Optional;
@@ -27,12 +22,9 @@ public class BankAccountQueryHandler implements BankAccountQueryService {
private final EventStoreDB eventStoreDB;
private final BankAccountMongoRepository mongoRepository;
private static final String SERVICE_NAME = "microservice";
@Override
@NewSpan
@Retry(name = SERVICE_NAME)
@CircuitBreaker(name = SERVICE_NAME)
public BankAccountResponseDTO handle(@SpanTag("query") GetBankAccountByIDQuery query) {
Optional<BankAccountDocument> optionalDocument = mongoRepository.findByAggregateId(query.aggregateID());
if (optionalDocument.isPresent()) {
@@ -40,20 +32,11 @@ public class BankAccountQueryHandler implements BankAccountQueryService {
}
final var aggregate = eventStoreDB.load(query.aggregateID(), BankAccountAggregate.class);
final var savedDocument = mongoRepository.save(BankAccountMapper.bankAccountDocumentFromAggregate(aggregate));
BankAccountDocument savedDocument = mongoRepository.save(BankAccountMapper.bankAccountDocumentFromAggregate(aggregate));
log.info("(GetBankAccountByIDQuery) savedDocument: {}", savedDocument);
final var bankAccountResponseDTO = BankAccountMapper.bankAccountResponseDTOFromAggregate(aggregate);
log.info("(GetBankAccountByIDQuery) response: {}", bankAccountResponseDTO);
return bankAccountResponseDTO;
}
@Override
@NewSpan
@Retry(name = SERVICE_NAME)
@CircuitBreaker(name = SERVICE_NAME)
public Page<BankAccountResponseDTO> handle(@SpanTag("query") FindAllOrderByBalance query) {
return mongoRepository.findAll(PageRequest.of(query.page(), query.size(), Sort.by("balance")))
.map(BankAccountMapper::bankAccountResponseDTOFromDocument);
}
}

View File

@@ -1,9 +1,7 @@
package com.eventsourcing.bankAccount.queries;
import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO;
import org.springframework.data.domain.Page;
public interface BankAccountQueryService {
BankAccountResponseDTO handle(GetBankAccountByIDQuery query);
Page<BankAccountResponseDTO> handle(FindAllOrderByBalance query);
}

View File

@@ -1,4 +0,0 @@
package com.eventsourcing.bankAccount.queries;
public record FindAllOrderByBalance(int page, int size) {
}

View File

@@ -28,6 +28,7 @@ public class KafkaProducerConfig {
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
producerProps.put(ProducerConfig.ACKS_CONFIG, kafkaConfigProperties.getAcks());
// producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, kafkaConfigProperties.getCompressionType());
producerProps.put(ProducerConfig.RETRIES_CONFIG, kafkaConfigProperties.getRetries());
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaConfigProperties.getDeliveryTimeoutMs());
producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaConfigProperties.getMaxRequestSize());

View File

@@ -43,32 +43,4 @@ management.endpoints.web.exposure.include=health,prometheus,info
spring.sleuth.propagation.type=w3c,b3
spring.sleuth.opentracing.enabled=true
spring.zipkin.base-url=http://localhost:9411
resilience4j.retry.instances.microservice.max-attempts=3
resilience4j.retry.instances.microservice.waitDuration=1s
resilience4j.retry.instances.microservice.enableExponentialBackoff=true
resilience4j.retry.instances.microservice.exponentialBackoffMultiplier=2
resilience4j.retry.instances.microservice.ignore-exceptions=com.eventsourcing.es.exceptions.AggregateNotFoundException
resilience4j.circuitbreaker.instances.microservice.registerHealthIndicator=true
resilience4j.circuitbreaker.instances.microservice.slidingWindowSize=5
resilience4j.circuitbreaker.instances.microservice.permittedNumberOfCallsInHalfOpenState=3
resilience4j.circuitbreaker.instances.microservice.slidingWindowType=TIME_BASED
resilience4j.circuitbreaker.instances.microservice.minimumNumberOfCalls=10
resilience4j.circuitbreaker.instances.microservice.waitDurationInOpenState=20s
resilience4j.circuitbreaker.instances.microservice.failureRateThreshold=30
resilience4j.circuitbreaker.instances.microservice.eventConsumerBufferSize=10
resilience4j.thread-pool-bulkhead.instances.microservice.maxThreadPoolSize=1
resilience4j.thread-pool-bulkhead.instances.microservice.coreThreadPoolSize=1
resilience4j.thread-pool-bulkhead.instances.microservice.queueCapacity=1
resilience4j.timelimiter.instances.microservice.timeoutDuration=3s
resilience4j.timelimiter.instances.microservice.cancelRunningFuture=true
spring.jdbc.template.query-timeout=3
spring.zipkin.base-url=http://localhost:9411