diff --git a/src/main/java/com/eventsourcing/bankAccount/delivery/BankAccountController.java b/src/main/java/com/eventsourcing/bankAccount/delivery/BankAccountController.java index cc16d84..0fc5f49 100644 --- a/src/main/java/com/eventsourcing/bankAccount/delivery/BankAccountController.java +++ b/src/main/java/com/eventsourcing/bankAccount/delivery/BankAccountController.java @@ -4,9 +4,11 @@ package com.eventsourcing.bankAccount.delivery; import com.eventsourcing.bankAccount.commands.*; import com.eventsourcing.bankAccount.dto.*; import com.eventsourcing.bankAccount.queries.BankAccountQueryService; +import com.eventsourcing.bankAccount.queries.FindAllOrderByBalance; import com.eventsourcing.bankAccount.queries.GetBankAccountByIDQuery; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.domain.Page; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; @@ -25,8 +27,7 @@ public class BankAccountController { @GetMapping("{aggregateId}") public ResponseEntity getBankAccount(@PathVariable String aggregateId) { - final var query = new GetBankAccountByIDQuery(aggregateId); - final var result = queryService.handle(query); + final var result = queryService.handle(new GetBankAccountByIDQuery(aggregateId)); log.info("GET bank account result: {}", result); return ResponseEntity.ok(result); } @@ -34,33 +35,35 @@ public class BankAccountController { @PostMapping public ResponseEntity createBankAccount(@Valid @RequestBody CreateBankAccountRequestDTO dto) { final var aggregateID = UUID.randomUUID().toString(); - final var command = new CreateBankAccountCommand(aggregateID, dto.email(), dto.userName(), dto.address()); - final var id = commandService.handle(command); + final var id = commandService.handle(new CreateBankAccountCommand(aggregateID, dto.email(), dto.userName(), dto.address())); log.info("CREATE bank account id: {}", id); return ResponseEntity.status(HttpStatus.CREATED).body(id); } @PostMapping(path = "/deposit/{aggregateId}") public ResponseEntity depositAmount(@Valid @RequestBody DepositAmountRequestDTO dto, @PathVariable String aggregateId) { - final var command = new DepositAmountCommand(aggregateId, dto.amount()); - commandService.handle(command); - log.info("DepositAmountCommand command: {}", command); + commandService.handle(new DepositAmountCommand(aggregateId, dto.amount())); return ResponseEntity.ok().build(); } @PostMapping(path = "/email/{aggregateId}") public ResponseEntity changeEmail(@Valid @RequestBody ChangeEmailRequestDTO dto, @PathVariable String aggregateId) { - final var command = new ChangeEmailCommand(aggregateId, dto.newEmail()); - commandService.handle(command); - log.info("ChangeEmailCommand command: {}", command); + commandService.handle(new ChangeEmailCommand(aggregateId, dto.newEmail())); return ResponseEntity.ok().build(); } @PostMapping(path = "/address/{aggregateId}") public ResponseEntity changeAddress(@Valid @RequestBody ChangeAddressRequestDTO dto, @PathVariable String aggregateId) { - final var command = new ChangeAddressCommand(aggregateId, dto.newAddress()); - commandService.handle(command); - log.info("changeAddress command: {}", command); + commandService.handle(new ChangeAddressCommand(aggregateId, dto.newAddress())); return ResponseEntity.ok().build(); } + + @GetMapping("/balance") + public ResponseEntity> getAllOrderByBalance(@RequestParam(name = "page", defaultValue = "0") Integer page, + @RequestParam(name = "size", defaultValue = "10") Integer size) { + + final var result = queryService.handle(new FindAllOrderByBalance(page, size)); + log.info("GET all by balance result: {}", result); + return ResponseEntity.ok(result); + } } diff --git a/src/main/java/com/eventsourcing/bankAccount/domain/BankAccountAggregate.java b/src/main/java/com/eventsourcing/bankAccount/domain/BankAccountAggregate.java index 6013a7e..534bb95 100644 --- a/src/main/java/com/eventsourcing/bankAccount/domain/BankAccountAggregate.java +++ b/src/main/java/com/eventsourcing/bankAccount/domain/BankAccountAggregate.java @@ -4,6 +4,8 @@ import com.eventsourcing.bankAccount.events.AddressUpdatedEvent; import com.eventsourcing.bankAccount.events.BalanceDepositedEvent; import com.eventsourcing.bankAccount.events.BankAccountCreatedEvent; import com.eventsourcing.bankAccount.events.EmailChangedEvent; +import com.eventsourcing.bankAccount.exceptions.InvalidAddressException; +import com.eventsourcing.bankAccount.exceptions.InvalidEmailException; import com.eventsourcing.es.AggregateRoot; import com.eventsourcing.es.Event; import com.eventsourcing.es.SerializerUtils; @@ -57,13 +59,13 @@ public class BankAccountAggregate extends AggregateRoot { private void handle(final EmailChangedEvent event) { Objects.requireNonNull(event.getNewEmail()); - if (event.getNewEmail().isBlank()) throw new RuntimeException("invalid email address"); + if (event.getNewEmail().isBlank()) throw new InvalidEmailException(); this.email = event.getNewEmail(); } private void handle(final AddressUpdatedEvent event) { Objects.requireNonNull(event.getNewAddress()); - if (event.getNewAddress().isBlank()) throw new RuntimeException("invalid address"); + if (event.getNewAddress().isBlank()) throw new InvalidAddressException(); this.address = event.getNewAddress(); } diff --git a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java index 6d9c09c..5073316 100644 --- a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java +++ b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java @@ -13,6 +13,8 @@ import com.eventsourcing.es.EventStoreDB; import com.eventsourcing.es.Projection; import com.eventsourcing.es.SerializerUtils; import com.eventsourcing.mappers.BankAccountMapper; +import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker; +import io.github.resilience4j.retry.annotation.Retry; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.sleuth.annotation.NewSpan; @@ -26,7 +28,6 @@ import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.util.Arrays; import java.util.List; -import java.util.Optional; @Service @Slf4j @@ -35,24 +36,23 @@ public class BankAccountMongoProjection implements Projection { private final BankAccountMongoRepository mongoRepository; private final EventStoreDB eventStoreDB; + private static final String SERVICE_NAME = "microservice"; @KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"}, groupId = "${microservice.kafka.groupId}", concurrency = "${microservice.kafka.default-concurrency}") public void bankAccountMongoProjectionListener(@Payload byte[] data, ConsumerRecordMetadata meta, Acknowledgment ack) { - log.info("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp()); - log.info("(BankAccountMongoProjection) data: {}", new String(data)); + log.info("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}, data: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp(), new String(data)); try { final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data); this.processEvents(Arrays.stream(events).toList()); ack.acknowledge(); log.info("ack events: {}", Arrays.toString(events)); - } catch (Exception e) { + } catch (Exception ex) { ack.nack(100); - log.error("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp()); - log.error("bankAccountMongoProjectionListener: {}", e.getMessage()); + log.error("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp(), ex); } } @@ -73,6 +73,8 @@ public class BankAccountMongoProjection implements Projection { @Override @NewSpan + @Retry(name = SERVICE_NAME) + @CircuitBreaker(name = SERVICE_NAME) public void when(@SpanTag("event") Event event) { final var aggregateId = event.getAggregateId(); log.info("(when) >>>>> aggregateId: {}", aggregateId); @@ -110,7 +112,7 @@ public class BankAccountMongoProjection implements Projection { @NewSpan private void handle(@SpanTag("event") EmailChangedEvent event) { log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId()); - Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); + final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); if (documentOptional.isEmpty()) throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); @@ -122,7 +124,7 @@ public class BankAccountMongoProjection implements Projection { @NewSpan private void handle(@SpanTag("event") AddressUpdatedEvent event) { log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId()); - Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); + final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); if (documentOptional.isEmpty()) throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); @@ -134,7 +136,7 @@ public class BankAccountMongoProjection implements Projection { @NewSpan private void handle(@SpanTag("event") BalanceDepositedEvent event) { log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId()); - Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); + final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); if (documentOptional.isEmpty()) throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); diff --git a/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java b/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java index 25f509f..8a6ef59 100644 --- a/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java +++ b/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java @@ -6,10 +6,15 @@ import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO; import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository; import com.eventsourcing.es.EventStoreDB; import com.eventsourcing.mappers.BankAccountMapper; +import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker; +import io.github.resilience4j.retry.annotation.Retry; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.sleuth.annotation.NewSpan; import org.springframework.cloud.sleuth.annotation.SpanTag; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Sort; import org.springframework.stereotype.Service; import java.util.Optional; @@ -22,9 +27,12 @@ public class BankAccountQueryHandler implements BankAccountQueryService { private final EventStoreDB eventStoreDB; private final BankAccountMongoRepository mongoRepository; + private static final String SERVICE_NAME = "microservice"; @Override @NewSpan + @Retry(name = SERVICE_NAME) + @CircuitBreaker(name = SERVICE_NAME) public BankAccountResponseDTO handle(@SpanTag("query") GetBankAccountByIDQuery query) { Optional optionalDocument = mongoRepository.findByAggregateId(query.aggregateID()); if (optionalDocument.isPresent()) { @@ -32,11 +40,20 @@ public class BankAccountQueryHandler implements BankAccountQueryService { } final var aggregate = eventStoreDB.load(query.aggregateID(), BankAccountAggregate.class); - BankAccountDocument savedDocument = mongoRepository.save(BankAccountMapper.bankAccountDocumentFromAggregate(aggregate)); + final var savedDocument = mongoRepository.save(BankAccountMapper.bankAccountDocumentFromAggregate(aggregate)); log.info("(GetBankAccountByIDQuery) savedDocument: {}", savedDocument); final var bankAccountResponseDTO = BankAccountMapper.bankAccountResponseDTOFromAggregate(aggregate); log.info("(GetBankAccountByIDQuery) response: {}", bankAccountResponseDTO); return bankAccountResponseDTO; } + + @Override + @NewSpan + @Retry(name = SERVICE_NAME) + @CircuitBreaker(name = SERVICE_NAME) + public Page handle(@SpanTag("query") FindAllOrderByBalance query) { + return mongoRepository.findAll(PageRequest.of(query.page(), query.size(), Sort.by("balance"))) + .map(BankAccountMapper::bankAccountResponseDTOFromDocument); + } } diff --git a/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryService.java b/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryService.java index d5eaed4..754a17d 100644 --- a/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryService.java +++ b/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryService.java @@ -1,7 +1,9 @@ package com.eventsourcing.bankAccount.queries; import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO; +import org.springframework.data.domain.Page; public interface BankAccountQueryService { BankAccountResponseDTO handle(GetBankAccountByIDQuery query); + Page handle(FindAllOrderByBalance query); } diff --git a/src/main/java/com/eventsourcing/bankAccount/queries/FindAllOrderByBalance.java b/src/main/java/com/eventsourcing/bankAccount/queries/FindAllOrderByBalance.java new file mode 100644 index 0000000..5aee0f5 --- /dev/null +++ b/src/main/java/com/eventsourcing/bankAccount/queries/FindAllOrderByBalance.java @@ -0,0 +1,4 @@ +package com.eventsourcing.bankAccount.queries; + +public record FindAllOrderByBalance(int page, int size) { +} diff --git a/src/main/java/com/eventsourcing/configuration/KafkaProducerConfig.java b/src/main/java/com/eventsourcing/configuration/KafkaProducerConfig.java index d1e3eac..20372be 100644 --- a/src/main/java/com/eventsourcing/configuration/KafkaProducerConfig.java +++ b/src/main/java/com/eventsourcing/configuration/KafkaProducerConfig.java @@ -28,7 +28,6 @@ public class KafkaProducerConfig { producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); producerProps.put(ProducerConfig.ACKS_CONFIG, kafkaConfigProperties.getAcks()); -// producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, kafkaConfigProperties.getCompressionType()); producerProps.put(ProducerConfig.RETRIES_CONFIG, kafkaConfigProperties.getRetries()); producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaConfigProperties.getDeliveryTimeoutMs()); producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaConfigProperties.getMaxRequestSize()); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 9d720c2..b42cfe2 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -50,15 +50,16 @@ resilience4j.retry.instances.microservice.max-attempts=3 resilience4j.retry.instances.microservice.waitDuration=1s resilience4j.retry.instances.microservice.enableExponentialBackoff=true resilience4j.retry.instances.microservice.exponentialBackoffMultiplier=2 +resilience4j.retry.instances.microservice.ignore-exceptions=com.eventsourcing.es.exceptions.AggregateNotFoundException resilience4j.circuitbreaker.instances.microservice.registerHealthIndicator=true resilience4j.circuitbreaker.instances.microservice.slidingWindowSize=5 resilience4j.circuitbreaker.instances.microservice.permittedNumberOfCallsInHalfOpenState=3 resilience4j.circuitbreaker.instances.microservice.slidingWindowType=TIME_BASED -resilience4j.circuitbreaker.instances.microservice.minimumNumberOfCalls=20 -resilience4j.circuitbreaker.instances.microservice.waitDurationInOpenState=50s -resilience4j.circuitbreaker.instances.microservice.failureRateThreshold=50 +resilience4j.circuitbreaker.instances.microservice.minimumNumberOfCalls=10 +resilience4j.circuitbreaker.instances.microservice.waitDurationInOpenState=20s +resilience4j.circuitbreaker.instances.microservice.failureRateThreshold=30 resilience4j.circuitbreaker.instances.microservice.eventConsumerBufferSize=10 @@ -67,4 +68,7 @@ resilience4j.thread-pool-bulkhead.instances.microservice.coreThreadPoolSize=1 resilience4j.thread-pool-bulkhead.instances.microservice.queueCapacity=1 resilience4j.timelimiter.instances.microservice.timeoutDuration=3s -resilience4j.timelimiter.instances.microservice.cancelRunningFuture=true \ No newline at end of file +resilience4j.timelimiter.instances.microservice.cancelRunningFuture=true + +spring.jdbc.template.query-timeout=3 +