Compare commits
20 Commits
feature/ka
...
feature/re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
da74902398 | ||
|
|
ea82a3ed0f | ||
|
|
995b1c5457 | ||
|
|
d56dfa6208 | ||
|
|
eb019a12f0 | ||
|
|
f9a50bf1d9 | ||
|
|
a87fde57fe | ||
|
|
ff088e891f | ||
|
|
5f9dee9905 | ||
|
|
3e642c079c | ||
|
|
3f6d872444 | ||
|
|
747d6da0b3 | ||
|
|
920ce91496 | ||
|
|
c513bbfc39 | ||
|
|
165cd1ebee | ||
|
|
92df832b48 | ||
|
|
07282ade74 | ||
|
|
406694ca38 | ||
|
|
15ca094c01 | ||
|
|
3b17fed06c |
29
pom.xml
29
pom.xml
@@ -17,6 +17,35 @@
|
||||
<java.version>17</java.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.github.resilience4j</groupId>
|
||||
<artifactId>resilience4j-spring-boot2</artifactId>
|
||||
<version>1.7.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-sleuth-zipkin</artifactId>
|
||||
<version>3.1.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-sleuth</artifactId>
|
||||
<version>3.1.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.micrometer</groupId>
|
||||
<artifactId>micrometer-registry-prometheus</artifactId>
|
||||
<version>1.8.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springdoc</groupId>
|
||||
<artifactId>springdoc-openapi-ui</artifactId>
|
||||
<version>1.6.7</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-data-mongodb</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.flywaydb</groupId>
|
||||
<artifactId>flyway-core</artifactId>
|
||||
|
||||
@@ -3,19 +3,27 @@ package com.eventsourcing.bankAccount.commands;
|
||||
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
|
||||
import com.eventsourcing.es.EventStoreDB;
|
||||
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
|
||||
import io.github.resilience4j.retry.annotation.Retry;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cloud.sleuth.annotation.NewSpan;
|
||||
import org.springframework.cloud.sleuth.annotation.SpanTag;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
@Service
|
||||
public class BankAccountCommandHandler implements BankAccountCommandService{
|
||||
public class BankAccountCommandHandler implements BankAccountCommandService {
|
||||
|
||||
private final EventStoreDB eventStoreDB;
|
||||
private static final String SERVICE_NAME = "microservice";
|
||||
|
||||
@Override
|
||||
public String handle(CreateBankAccountCommand command) {
|
||||
@NewSpan
|
||||
@Retry(name = SERVICE_NAME)
|
||||
@CircuitBreaker(name = SERVICE_NAME)
|
||||
public String handle(@SpanTag("command") CreateBankAccountCommand command) {
|
||||
final var aggregate = new BankAccountAggregate(command.aggregateID());
|
||||
aggregate.createBankAccount(command.email(), command.address(), command.userName());
|
||||
eventStoreDB.save(aggregate);
|
||||
@@ -25,7 +33,10 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(ChangeEmailCommand command) {
|
||||
@NewSpan
|
||||
@Retry(name = SERVICE_NAME)
|
||||
@CircuitBreaker(name = SERVICE_NAME)
|
||||
public void handle(@SpanTag("command") ChangeEmailCommand command) {
|
||||
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
|
||||
aggregate.changeEmail(command.newEmail());
|
||||
eventStoreDB.save(aggregate);
|
||||
@@ -33,7 +44,10 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(ChangeAddressCommand command) {
|
||||
@NewSpan
|
||||
@Retry(name = SERVICE_NAME)
|
||||
@CircuitBreaker(name = SERVICE_NAME)
|
||||
public void handle(@SpanTag("command") ChangeAddressCommand command) {
|
||||
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
|
||||
aggregate.changeAddress(command.newAddress());
|
||||
eventStoreDB.save(aggregate);
|
||||
@@ -41,7 +55,10 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(DepositAmountCommand command) {
|
||||
@NewSpan
|
||||
@Retry(name = SERVICE_NAME)
|
||||
@CircuitBreaker(name = SERVICE_NAME)
|
||||
public void handle(@SpanTag("command") DepositAmountCommand command) {
|
||||
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
|
||||
aggregate.depositBalance(command.amount());
|
||||
eventStoreDB.save(aggregate);
|
||||
|
||||
@@ -4,9 +4,11 @@ package com.eventsourcing.bankAccount.delivery;
|
||||
import com.eventsourcing.bankAccount.commands.*;
|
||||
import com.eventsourcing.bankAccount.dto.*;
|
||||
import com.eventsourcing.bankAccount.queries.BankAccountQueryService;
|
||||
import com.eventsourcing.bankAccount.queries.FindAllOrderByBalance;
|
||||
import com.eventsourcing.bankAccount.queries.GetBankAccountByIDQuery;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
@@ -25,9 +27,7 @@ public class BankAccountController {
|
||||
|
||||
@GetMapping("{aggregateId}")
|
||||
public ResponseEntity<BankAccountResponseDTO> getBankAccount(@PathVariable String aggregateId) {
|
||||
final var query = new GetBankAccountByIDQuery(aggregateId);
|
||||
log.info("GET bank account query: {}", query);
|
||||
final var result = queryService.handle(query);
|
||||
final var result = queryService.handle(new GetBankAccountByIDQuery(aggregateId));
|
||||
log.info("GET bank account result: {}", result);
|
||||
return ResponseEntity.ok(result);
|
||||
}
|
||||
@@ -35,33 +35,35 @@ public class BankAccountController {
|
||||
@PostMapping
|
||||
public ResponseEntity<String> createBankAccount(@Valid @RequestBody CreateBankAccountRequestDTO dto) {
|
||||
final var aggregateID = UUID.randomUUID().toString();
|
||||
final var command = new CreateBankAccountCommand(aggregateID, dto.email(), dto.userName(), dto.address());
|
||||
final var id = commandService.handle(command);
|
||||
final var id = commandService.handle(new CreateBankAccountCommand(aggregateID, dto.email(), dto.userName(), dto.address()));
|
||||
log.info("CREATE bank account id: {}", id);
|
||||
return ResponseEntity.status(HttpStatus.CREATED).body(id);
|
||||
}
|
||||
|
||||
@PostMapping(path = "/deposit/{aggregateId}")
|
||||
public ResponseEntity<Void> depositAmount(@Valid @RequestBody DepositAmountRequestDTO dto, @PathVariable String aggregateId) {
|
||||
final var command = new DepositAmountCommand(aggregateId, dto.amount());
|
||||
commandService.handle(command);
|
||||
log.info("DepositAmountCommand command: {}", command);
|
||||
commandService.handle(new DepositAmountCommand(aggregateId, dto.amount()));
|
||||
return ResponseEntity.ok().build();
|
||||
}
|
||||
|
||||
@PostMapping(path = "/email/{aggregateId}")
|
||||
public ResponseEntity<Void> changeEmail(@Valid @RequestBody ChangeEmailRequestDTO dto, @PathVariable String aggregateId) {
|
||||
final var command = new ChangeEmailCommand(aggregateId, dto.newEmail());
|
||||
commandService.handle(command);
|
||||
log.info("ChangeEmailCommand command: {}", command);
|
||||
commandService.handle(new ChangeEmailCommand(aggregateId, dto.newEmail()));
|
||||
return ResponseEntity.ok().build();
|
||||
}
|
||||
|
||||
@PostMapping(path = "/address/{aggregateId}")
|
||||
public ResponseEntity<Void> changeAddress(@Valid @RequestBody ChangeAddressRequestDTO dto, @PathVariable String aggregateId) {
|
||||
final var command = new ChangeAddressCommand(aggregateId, dto.newAddress());
|
||||
commandService.handle(command);
|
||||
log.info("changeAddress command: {}", command);
|
||||
commandService.handle(new ChangeAddressCommand(aggregateId, dto.newAddress()));
|
||||
return ResponseEntity.ok().build();
|
||||
}
|
||||
|
||||
@GetMapping("/balance")
|
||||
public ResponseEntity<Page<BankAccountResponseDTO>> getAllOrderByBalance(@RequestParam(name = "page", defaultValue = "0") Integer page,
|
||||
@RequestParam(name = "size", defaultValue = "10") Integer size) {
|
||||
|
||||
final var result = queryService.handle(new FindAllOrderByBalance(page, size));
|
||||
log.info("GET all by balance result: {}", result);
|
||||
return ResponseEntity.ok(result);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@ import com.eventsourcing.bankAccount.events.AddressUpdatedEvent;
|
||||
import com.eventsourcing.bankAccount.events.BalanceDepositedEvent;
|
||||
import com.eventsourcing.bankAccount.events.BankAccountCreatedEvent;
|
||||
import com.eventsourcing.bankAccount.events.EmailChangedEvent;
|
||||
import com.eventsourcing.bankAccount.exceptions.InvalidAddressException;
|
||||
import com.eventsourcing.bankAccount.exceptions.InvalidEmailException;
|
||||
import com.eventsourcing.es.AggregateRoot;
|
||||
import com.eventsourcing.es.Event;
|
||||
import com.eventsourcing.es.SerializerUtils;
|
||||
@@ -57,13 +59,13 @@ public class BankAccountAggregate extends AggregateRoot {
|
||||
|
||||
private void handle(final EmailChangedEvent event) {
|
||||
Objects.requireNonNull(event.getNewEmail());
|
||||
if (event.getNewEmail().isBlank()) throw new RuntimeException("invalid email address");
|
||||
if (event.getNewEmail().isBlank()) throw new InvalidEmailException();
|
||||
this.email = event.getNewEmail();
|
||||
}
|
||||
|
||||
private void handle(final AddressUpdatedEvent event) {
|
||||
Objects.requireNonNull(event.getNewAddress());
|
||||
if (event.getNewAddress().isBlank()) throw new RuntimeException("invalid address");
|
||||
if (event.getNewAddress().isBlank()) throw new InvalidAddressException();
|
||||
this.address = event.getNewAddress();
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
package com.eventsourcing.bankAccount.domain;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.bson.codecs.pojo.annotations.BsonProperty;
|
||||
import org.springframework.data.mongodb.core.mapping.Document;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
@Document(collection = "bankAccounts")
|
||||
public class BankAccountDocument {
|
||||
|
||||
@BsonProperty(value = "_id")
|
||||
private String id;
|
||||
|
||||
@BsonProperty(value = "aggregateId")
|
||||
private String aggregateId;
|
||||
|
||||
@BsonProperty(value = "email")
|
||||
private String email;
|
||||
|
||||
@BsonProperty(value = "userName")
|
||||
private String userName;
|
||||
|
||||
@BsonProperty(value = "address")
|
||||
private String address;
|
||||
|
||||
@BsonProperty(value = "balance")
|
||||
private BigDecimal balance;
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.eventsourcing.bankAccount.exceptions;
|
||||
|
||||
public class InvalidAddressException extends RuntimeException {
|
||||
public InvalidAddressException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public InvalidAddressException(String address) {
|
||||
super("invalid address: " + address);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.eventsourcing.bankAccount.exceptions;
|
||||
|
||||
public class InvalidEmailException extends RuntimeException {
|
||||
public InvalidEmailException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public InvalidEmailException(String email) {
|
||||
super("invalid email address: " + email);
|
||||
}
|
||||
}
|
||||
@@ -1,55 +1,81 @@
|
||||
package com.eventsourcing.bankAccount.projection;
|
||||
|
||||
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountDocument;
|
||||
import com.eventsourcing.bankAccount.events.AddressUpdatedEvent;
|
||||
import com.eventsourcing.bankAccount.events.BalanceDepositedEvent;
|
||||
import com.eventsourcing.bankAccount.events.BankAccountCreatedEvent;
|
||||
import com.eventsourcing.bankAccount.events.EmailChangedEvent;
|
||||
import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository;
|
||||
import com.eventsourcing.es.Event;
|
||||
import com.eventsourcing.es.EventStoreDB;
|
||||
import com.eventsourcing.es.Projection;
|
||||
import com.eventsourcing.es.SerializerUtils;
|
||||
import com.eventsourcing.exceptions.UnknownEventTypeException;
|
||||
import com.eventsourcing.mappers.BankAccountMapper;
|
||||
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
|
||||
import io.github.resilience4j.retry.annotation.Retry;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.cloud.sleuth.annotation.NewSpan;
|
||||
import org.springframework.cloud.sleuth.annotation.SpanTag;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.messaging.handler.annotation.Payload;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class BankAccountMongoProjection implements Projection {
|
||||
|
||||
@Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}")
|
||||
private String bankAccountTopicName;
|
||||
private final BankAccountMongoRepository mongoRepository;
|
||||
private final EventStoreDB eventStoreDB;
|
||||
private static final String SERVICE_NAME = "microservice";
|
||||
|
||||
|
||||
@KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"},
|
||||
groupId = "${microservice.kafka.groupId}",
|
||||
concurrency = "${microservice.kafka.default-concurrency}")
|
||||
public void bankAccountMongoProjectionListener(@Payload byte[] data, ConsumerRecordMetadata meta, Acknowledgment ack) {
|
||||
log.info("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp());
|
||||
log.info("(BankAccountMongoProjection) data: {}", new String(data));
|
||||
log.info("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}, data: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp(), new String(data));
|
||||
|
||||
try {
|
||||
final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data);
|
||||
Arrays.stream(events).toList().forEach(this::when);
|
||||
this.processEvents(Arrays.stream(events).toList());
|
||||
ack.acknowledge();
|
||||
log.info("ack events: {}", Arrays.toString(events));
|
||||
} catch (Exception e) {
|
||||
} catch (Exception ex) {
|
||||
ack.nack(100);
|
||||
log.error("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp());
|
||||
log.error("bankAccountMongoProjectionListener: {}", e.getMessage());
|
||||
log.error("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp(), ex);
|
||||
}
|
||||
}
|
||||
|
||||
@NewSpan
|
||||
private void processEvents(@SpanTag("events") List<Event> events) {
|
||||
if (events.isEmpty()) return;
|
||||
|
||||
try {
|
||||
events.forEach(this::when);
|
||||
} catch (Exception ex) {
|
||||
mongoRepository.deleteByAggregateId(events.get(0).getAggregateId());
|
||||
final var aggregate = eventStoreDB.load(events.get(0).getAggregateId(), BankAccountAggregate.class);
|
||||
final var document = BankAccountMapper.bankAccountDocumentFromAggregate(aggregate);
|
||||
final var result = mongoRepository.save(document);
|
||||
log.info("(processEvents) saved document: {}", result);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void when(Event event) {
|
||||
@NewSpan
|
||||
@Retry(name = SERVICE_NAME)
|
||||
@CircuitBreaker(name = SERVICE_NAME)
|
||||
public void when(@SpanTag("event") Event event) {
|
||||
final var aggregateId = event.getAggregateId();
|
||||
log.info("(when) >>>>> aggregateId: {}", aggregateId);
|
||||
|
||||
@@ -62,33 +88,62 @@ public class BankAccountMongoProjection implements Projection {
|
||||
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class));
|
||||
case BalanceDepositedEvent.BALANCE_DEPOSITED ->
|
||||
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class));
|
||||
default -> throw new UnknownEventTypeException(event.getEventType());
|
||||
default -> log.error("unknown event type: {}", event.getEventType());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void handle(BankAccountCreatedEvent event) {
|
||||
@NewSpan
|
||||
private void handle(@SpanTag("event") BankAccountCreatedEvent event) {
|
||||
log.info("(when) BankAccountCreatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
|
||||
|
||||
// final var document = BankAccountDocument.builder()
|
||||
// .aggregateId(event.getAggregateId())
|
||||
// .email(event.getEmail())
|
||||
// .address(event.getAddress())
|
||||
// .userName(event.getUserName())
|
||||
// .balance(BigDecimal.valueOf(0))
|
||||
// .build();
|
||||
final var document = BankAccountDocument.builder()
|
||||
.aggregateId(event.getAggregateId())
|
||||
.email(event.getEmail())
|
||||
.address(event.getAddress())
|
||||
.userName(event.getUserName())
|
||||
.balance(BigDecimal.valueOf(0))
|
||||
.build();
|
||||
|
||||
final var insert = mongoRepository.insert(document);
|
||||
log.info("(BankAccountCreatedEvent) insert: {}", insert);
|
||||
}
|
||||
|
||||
private void handle(EmailChangedEvent event) {
|
||||
@NewSpan
|
||||
private void handle(@SpanTag("event") EmailChangedEvent event) {
|
||||
log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId());
|
||||
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
|
||||
if (documentOptional.isEmpty())
|
||||
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
|
||||
|
||||
final var document = documentOptional.get();
|
||||
document.setEmail(event.getNewEmail());
|
||||
mongoRepository.save(document);
|
||||
}
|
||||
|
||||
private void handle(AddressUpdatedEvent event) {
|
||||
@NewSpan
|
||||
private void handle(@SpanTag("event") AddressUpdatedEvent event) {
|
||||
log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
|
||||
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
|
||||
if (documentOptional.isEmpty())
|
||||
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
|
||||
|
||||
final var document = documentOptional.get();
|
||||
document.setAddress(event.getNewAddress());
|
||||
mongoRepository.save(document);
|
||||
}
|
||||
|
||||
private void handle(BalanceDepositedEvent event) {
|
||||
@NewSpan
|
||||
private void handle(@SpanTag("event") BalanceDepositedEvent event) {
|
||||
log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId());
|
||||
final var documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
|
||||
if (documentOptional.isEmpty())
|
||||
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
|
||||
|
||||
final var document = documentOptional.get();
|
||||
final var balance = document.getBalance();
|
||||
final var newBalance = balance.add(event.getAmount());
|
||||
document.setBalance(newBalance);
|
||||
mongoRepository.save(document);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,24 @@
|
||||
package com.eventsourcing.bankAccount.queries;
|
||||
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountDocument;
|
||||
import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO;
|
||||
import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository;
|
||||
import com.eventsourcing.es.EventStoreDB;
|
||||
import com.eventsourcing.mappers.BankAccountMapper;
|
||||
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
|
||||
import io.github.resilience4j.retry.annotation.Retry;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cloud.sleuth.annotation.NewSpan;
|
||||
import org.springframework.cloud.sleuth.annotation.SpanTag;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.data.domain.PageRequest;
|
||||
import org.springframework.data.domain.Sort;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
@@ -15,12 +26,34 @@ import org.springframework.stereotype.Service;
|
||||
public class BankAccountQueryHandler implements BankAccountQueryService {
|
||||
|
||||
private final EventStoreDB eventStoreDB;
|
||||
private final BankAccountMongoRepository mongoRepository;
|
||||
private static final String SERVICE_NAME = "microservice";
|
||||
|
||||
@Override
|
||||
public BankAccountResponseDTO handle(GetBankAccountByIDQuery query) {
|
||||
@NewSpan
|
||||
@Retry(name = SERVICE_NAME)
|
||||
@CircuitBreaker(name = SERVICE_NAME)
|
||||
public BankAccountResponseDTO handle(@SpanTag("query") GetBankAccountByIDQuery query) {
|
||||
Optional<BankAccountDocument> optionalDocument = mongoRepository.findByAggregateId(query.aggregateID());
|
||||
if (optionalDocument.isPresent()) {
|
||||
return BankAccountMapper.bankAccountResponseDTOFromDocument(optionalDocument.get());
|
||||
}
|
||||
|
||||
final var aggregate = eventStoreDB.load(query.aggregateID(), BankAccountAggregate.class);
|
||||
final var savedDocument = mongoRepository.save(BankAccountMapper.bankAccountDocumentFromAggregate(aggregate));
|
||||
log.info("(GetBankAccountByIDQuery) savedDocument: {}", savedDocument);
|
||||
|
||||
final var bankAccountResponseDTO = BankAccountMapper.bankAccountResponseDTOFromAggregate(aggregate);
|
||||
log.info("(GetBankAccountByIDQuery) response: {}", bankAccountResponseDTO);
|
||||
return bankAccountResponseDTO;
|
||||
}
|
||||
|
||||
@Override
|
||||
@NewSpan
|
||||
@Retry(name = SERVICE_NAME)
|
||||
@CircuitBreaker(name = SERVICE_NAME)
|
||||
public Page<BankAccountResponseDTO> handle(@SpanTag("query") FindAllOrderByBalance query) {
|
||||
return mongoRepository.findAll(PageRequest.of(query.page(), query.size(), Sort.by("balance")))
|
||||
.map(BankAccountMapper::bankAccountResponseDTOFromDocument);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package com.eventsourcing.bankAccount.queries;
|
||||
|
||||
import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO;
|
||||
import org.springframework.data.domain.Page;
|
||||
|
||||
public interface BankAccountQueryService {
|
||||
BankAccountResponseDTO handle(GetBankAccountByIDQuery query);
|
||||
Page<BankAccountResponseDTO> handle(FindAllOrderByBalance query);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.eventsourcing.bankAccount.queries;
|
||||
|
||||
public record FindAllOrderByBalance(int page, int size) {
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.eventsourcing.bankAccount.repository;
|
||||
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountDocument;
|
||||
import org.springframework.data.mongodb.repository.MongoRepository;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public interface BankAccountMongoRepository extends MongoRepository<BankAccountDocument, String> {
|
||||
|
||||
Optional<BankAccountDocument> findByAggregateId(String aggregateId);
|
||||
|
||||
void deleteByAggregateId(String aggregateId);
|
||||
}
|
||||
@@ -28,7 +28,6 @@ public class KafkaProducerConfig {
|
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
producerProps.put(ProducerConfig.ACKS_CONFIG, kafkaConfigProperties.getAcks());
|
||||
// producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, kafkaConfigProperties.getCompressionType());
|
||||
producerProps.put(ProducerConfig.RETRIES_CONFIG, kafkaConfigProperties.getRetries());
|
||||
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaConfigProperties.getDeliveryTimeoutMs());
|
||||
producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaConfigProperties.getMaxRequestSize());
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
package com.eventsourcing.configuration;
|
||||
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountDocument;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.domain.Sort;
|
||||
import org.springframework.data.mongodb.core.MongoTemplate;
|
||||
import org.springframework.data.mongodb.core.index.Index;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
@Configuration
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class MongoConfiguration {
|
||||
|
||||
private final MongoTemplate mongoTemplate;
|
||||
|
||||
@PostConstruct
|
||||
public void mongoInit() {
|
||||
final var bankAccounts = mongoTemplate.getCollection("bankAccounts");
|
||||
final var aggregateIdIndex = mongoTemplate.indexOps(BankAccountDocument.class).ensureIndex(new Index("aggregateId", Sort.Direction.ASC).unique());
|
||||
final var indexInfo = mongoTemplate.indexOps(BankAccountDocument.class).getIndexInfo();
|
||||
log.info("MongoDB connected, bankAccounts aggregateId index created: {}", indexInfo);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.eventsourcing.configuration;
|
||||
|
||||
|
||||
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
|
||||
import io.swagger.v3.oas.annotations.info.Contact;
|
||||
import io.swagger.v3.oas.annotations.info.Info;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
|
||||
@OpenAPIDefinition(info = @Info(title = "Spring CQRS and Event Sourcing Microservice",
|
||||
description = "Spring Postgresql MongoDB Kafka CQRS and Event Sourcing Microservice",
|
||||
contact = @Contact(name = "Alexander Bryksin", email = "alexander.bryksin@yandex.ru", url = "https://github.com/AleksK1NG")))
|
||||
@Component
|
||||
public class SwaggerOpenAPIConfiguration {
|
||||
}
|
||||
@@ -4,6 +4,8 @@ package com.eventsourcing.es;
|
||||
import com.eventsourcing.es.exceptions.AggregateNotFoundException;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cloud.sleuth.annotation.NewSpan;
|
||||
import org.springframework.cloud.sleuth.annotation.SpanTag;
|
||||
import org.springframework.dao.EmptyResultDataAccessException;
|
||||
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
@@ -28,7 +30,8 @@ public class EventStore implements EventStoreDB {
|
||||
private final EventBus eventBus;
|
||||
|
||||
@Override
|
||||
public void saveEvents(List<Event> events) {
|
||||
@NewSpan
|
||||
public void saveEvents(@SpanTag("events") List<Event> events) {
|
||||
if (events.isEmpty()) return;
|
||||
|
||||
final List<Event> changes = new ArrayList<>(events);
|
||||
@@ -46,7 +49,8 @@ public class EventStore implements EventStoreDB {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Event> loadEvents(String aggregateId, long version) {
|
||||
@NewSpan
|
||||
public List<Event> loadEvents(@SpanTag("aggregateId") String aggregateId, @SpanTag("version") long version) {
|
||||
final List<Event> events = jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of("aggregate_id", aggregateId, "version", version),
|
||||
(rs, rowNum) -> Event.builder()
|
||||
.aggregateId(rs.getString("aggregate_id"))
|
||||
@@ -62,7 +66,8 @@ public class EventStore implements EventStoreDB {
|
||||
return events;
|
||||
}
|
||||
|
||||
private <T extends AggregateRoot> void saveSnapshot(T aggregate) {
|
||||
@NewSpan
|
||||
private <T extends AggregateRoot> void saveSnapshot(@SpanTag("aggregate") T aggregate) {
|
||||
aggregate.toSnapshot();
|
||||
final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate);
|
||||
|
||||
@@ -78,7 +83,8 @@ public class EventStore implements EventStoreDB {
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public <T extends AggregateRoot> void save(T aggregate) {
|
||||
@NewSpan
|
||||
public <T extends AggregateRoot> void save(@SpanTag("aggregate") T aggregate) {
|
||||
final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());
|
||||
|
||||
if (aggregate.getVersion() > 1) {
|
||||
@@ -95,7 +101,8 @@ public class EventStore implements EventStoreDB {
|
||||
log.info("(save) saved aggregate: {}", aggregate);
|
||||
}
|
||||
|
||||
private void handleConcurrency(String aggregateId) {
|
||||
@NewSpan
|
||||
private void handleConcurrency(@SpanTag("aggregateId") String aggregateId) {
|
||||
try {
|
||||
String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of("aggregate_id", aggregateId), String.class);
|
||||
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateID);
|
||||
@@ -105,7 +112,8 @@ public class EventStore implements EventStoreDB {
|
||||
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateId);
|
||||
}
|
||||
|
||||
private Optional<Snapshot> loadSnapshot(String aggregateId) {
|
||||
@NewSpan
|
||||
private Optional<Snapshot> loadSnapshot(@SpanTag("aggregateId") String aggregateId) {
|
||||
final Optional<Snapshot> snapshot = jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of("aggregate_id", aggregateId), (rs, rowNum) -> Snapshot.builder()
|
||||
.aggregateId(rs.getString("aggregate_id"))
|
||||
.aggregateType(rs.getString("aggregate_type"))
|
||||
@@ -119,7 +127,8 @@ public class EventStore implements EventStoreDB {
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
private <T extends AggregateRoot> T getAggregate(final String aggregateId, final Class<T> aggregateType) {
|
||||
@NewSpan
|
||||
private <T extends AggregateRoot> T getAggregate(@SpanTag("aggregateId") final String aggregateId, @SpanTag("aggregateType") final Class<T> aggregateType) {
|
||||
try {
|
||||
return aggregateType.getConstructor(String.class).newInstance(aggregateId);
|
||||
} catch (Exception ex) {
|
||||
@@ -127,7 +136,8 @@ public class EventStore implements EventStoreDB {
|
||||
}
|
||||
}
|
||||
|
||||
private <T extends AggregateRoot> T getSnapshotFromClass(Optional<Snapshot> snapshot, String aggregateId, Class<T> aggregateType) {
|
||||
@NewSpan
|
||||
private <T extends AggregateRoot> T getSnapshotFromClass(@SpanTag("snapshot") Optional<Snapshot> snapshot, @SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
|
||||
if (snapshot.isEmpty()) {
|
||||
final var defaultSnapshot = EventSourcingUtils.snapshotFromAggregate(getAggregate(aggregateId, aggregateType));
|
||||
return EventSourcingUtils.aggregateFromSnapshot(defaultSnapshot, aggregateType);
|
||||
@@ -137,7 +147,8 @@ public class EventStore implements EventStoreDB {
|
||||
|
||||
@Override
|
||||
@Transactional(readOnly = true)
|
||||
public <T extends AggregateRoot> T load(String aggregateId, Class<T> aggregateType) {
|
||||
@NewSpan
|
||||
public <T extends AggregateRoot> T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
|
||||
|
||||
final Optional<Snapshot> snapshot = this.loadSnapshot(aggregateId);
|
||||
|
||||
@@ -156,7 +167,8 @@ public class EventStore implements EventStoreDB {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean exists(String aggregateId) {
|
||||
@NewSpan
|
||||
public Boolean exists(@SpanTag("aggregateId") String aggregateId) {
|
||||
try {
|
||||
final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of("aggregate_id", aggregateId), String.class);
|
||||
log.info("aggregate exists id: {}", id);
|
||||
|
||||
@@ -5,6 +5,8 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.cloud.sleuth.annotation.NewSpan;
|
||||
import org.springframework.cloud.sleuth.annotation.SpanTag;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@@ -21,7 +23,8 @@ public class KafkaEventBus implements EventBus {
|
||||
private String bankAccountTopicName;
|
||||
|
||||
@Override
|
||||
public void publish(List<Event> events) {
|
||||
@NewSpan
|
||||
public void publish(@SpanTag("events") List<Event> events) {
|
||||
final byte[] eventsBytes = SerializerUtils.serializeToJsonBytes(events.toArray(new Event[]{}));
|
||||
final ProducerRecord<String, byte[]> record = new ProducerRecord<>(bankAccountTopicName, eventsBytes);
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.eventsourcing.mappers;
|
||||
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
|
||||
import com.eventsourcing.bankAccount.domain.BankAccountDocument;
|
||||
import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO;
|
||||
|
||||
public final class BankAccountMapper {
|
||||
@@ -19,23 +20,23 @@ public final class BankAccountMapper {
|
||||
);
|
||||
}
|
||||
|
||||
// public static BankAccountResponseDTO bankAccountResponseDTOFromDocument(BankAccountDocument bankAccountDocument) {
|
||||
// return new BankAccountResponseDTO(
|
||||
// bankAccountDocument.getAggregateId(),
|
||||
// bankAccountDocument.getEmail(),
|
||||
// bankAccountDocument.getAddress(),
|
||||
// bankAccountDocument.getUserName(),
|
||||
// bankAccountDocument.getBalance()
|
||||
// );
|
||||
// }
|
||||
//
|
||||
// public static BankAccountDocument bankAccountDocumentFromAggregate(BankAccountAggregate bankAccountAggregate) {
|
||||
// return BankAccountDocument.builder()
|
||||
// .aggregateId(bankAccountAggregate.getId())
|
||||
// .email(bankAccountAggregate.getEmail())
|
||||
// .address(bankAccountAggregate.getAddress())
|
||||
// .userName(bankAccountAggregate.getUserName())
|
||||
// .balance(bankAccountAggregate.getBalance())
|
||||
// .build();
|
||||
// }
|
||||
public static BankAccountResponseDTO bankAccountResponseDTOFromDocument(BankAccountDocument bankAccountDocument) {
|
||||
return new BankAccountResponseDTO(
|
||||
bankAccountDocument.getAggregateId(),
|
||||
bankAccountDocument.getEmail(),
|
||||
bankAccountDocument.getAddress(),
|
||||
bankAccountDocument.getUserName(),
|
||||
bankAccountDocument.getBalance()
|
||||
);
|
||||
}
|
||||
|
||||
public static BankAccountDocument bankAccountDocumentFromAggregate(BankAccountAggregate bankAccountAggregate) {
|
||||
return BankAccountDocument.builder()
|
||||
.aggregateId(bankAccountAggregate.getId())
|
||||
.email(bankAccountAggregate.getEmail())
|
||||
.address(bankAccountAggregate.getAddress())
|
||||
.userName(bankAccountAggregate.getUserName())
|
||||
.balance(bankAccountAggregate.getBalance())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -27,4 +27,48 @@ logging.level.org.apache.kafka=warn
|
||||
|
||||
microservice.kafka.topics.bank-account-event-store=bank-account-event-store
|
||||
microservice.kafka.groupId=es_microservice
|
||||
microservice.kafka.default-concurrency=10
|
||||
microservice.kafka.default-concurrency=10
|
||||
|
||||
|
||||
spring.data.mongodb.host=localhost
|
||||
spring.data.mongodb.port=27017
|
||||
spring.data.mongodb.authentication-database=admin
|
||||
spring.data.mongodb.username=admin
|
||||
spring.data.mongodb.password=admin
|
||||
spring.data.mongodb.database=microservices
|
||||
|
||||
springdoc.swagger-ui.path=/swagger-ui.html
|
||||
|
||||
management.endpoints.web.exposure.include=health,prometheus,info
|
||||
|
||||
spring.sleuth.propagation.type=w3c,b3
|
||||
spring.sleuth.opentracing.enabled=true
|
||||
spring.zipkin.base-url=http://localhost:9411
|
||||
|
||||
|
||||
resilience4j.retry.instances.microservice.max-attempts=3
|
||||
resilience4j.retry.instances.microservice.waitDuration=1s
|
||||
resilience4j.retry.instances.microservice.enableExponentialBackoff=true
|
||||
resilience4j.retry.instances.microservice.exponentialBackoffMultiplier=2
|
||||
resilience4j.retry.instances.microservice.ignore-exceptions=com.eventsourcing.es.exceptions.AggregateNotFoundException
|
||||
|
||||
|
||||
resilience4j.circuitbreaker.instances.microservice.registerHealthIndicator=true
|
||||
resilience4j.circuitbreaker.instances.microservice.slidingWindowSize=5
|
||||
resilience4j.circuitbreaker.instances.microservice.permittedNumberOfCallsInHalfOpenState=3
|
||||
resilience4j.circuitbreaker.instances.microservice.slidingWindowType=TIME_BASED
|
||||
resilience4j.circuitbreaker.instances.microservice.minimumNumberOfCalls=10
|
||||
resilience4j.circuitbreaker.instances.microservice.waitDurationInOpenState=20s
|
||||
resilience4j.circuitbreaker.instances.microservice.failureRateThreshold=30
|
||||
resilience4j.circuitbreaker.instances.microservice.eventConsumerBufferSize=10
|
||||
|
||||
|
||||
resilience4j.thread-pool-bulkhead.instances.microservice.maxThreadPoolSize=1
|
||||
resilience4j.thread-pool-bulkhead.instances.microservice.coreThreadPoolSize=1
|
||||
resilience4j.thread-pool-bulkhead.instances.microservice.queueCapacity=1
|
||||
|
||||
resilience4j.timelimiter.instances.microservice.timeoutDuration=3s
|
||||
resilience4j.timelimiter.instances.microservice.cancelRunningFuture=true
|
||||
|
||||
spring.jdbc.template.query-timeout=3
|
||||
|
||||
|
||||
Reference in New Issue
Block a user