diff --git a/pom.xml b/pom.xml
index 81db9a2..b8da72a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -17,6 +17,10 @@
17
+
+ org.springframework.boot
+ spring-boot-starter-data-mongodb
+
org.flywaydb
flyway-core
diff --git a/src/main/java/com/eventsourcing/bankAccount/domain/BankAccountDocument.java b/src/main/java/com/eventsourcing/bankAccount/domain/BankAccountDocument.java
new file mode 100644
index 0000000..5ceed0a
--- /dev/null
+++ b/src/main/java/com/eventsourcing/bankAccount/domain/BankAccountDocument.java
@@ -0,0 +1,36 @@
+package com.eventsourcing.bankAccount.domain;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.bson.codecs.pojo.annotations.BsonProperty;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.math.BigDecimal;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
+@Document(collection = "bankAccounts")
+public class BankAccountDocument {
+
+ @BsonProperty(value = "_id")
+ private String id;
+
+ @BsonProperty(value = "aggregateId")
+ private String aggregateId;
+
+ @BsonProperty(value = "email")
+ private String email;
+
+ @BsonProperty(value = "userName")
+ private String userName;
+
+ @BsonProperty(value = "address")
+ private String address;
+
+ @BsonProperty(value = "balance")
+ private BigDecimal balance;
+}
diff --git a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java
index 935edf5..6ed2f34 100644
--- a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java
+++ b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java
@@ -1,32 +1,38 @@
package com.eventsourcing.bankAccount.projection;
+import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
+import com.eventsourcing.bankAccount.domain.BankAccountDocument;
import com.eventsourcing.bankAccount.events.AddressUpdatedEvent;
import com.eventsourcing.bankAccount.events.BalanceDepositedEvent;
import com.eventsourcing.bankAccount.events.BankAccountCreatedEvent;
import com.eventsourcing.bankAccount.events.EmailChangedEvent;
+import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository;
import com.eventsourcing.es.Event;
+import com.eventsourcing.es.EventStoreDB;
import com.eventsourcing.es.Projection;
import com.eventsourcing.es.SerializerUtils;
-import com.eventsourcing.exceptions.UnknownEventTypeException;
+import com.eventsourcing.mappers.BankAccountMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
+import java.math.BigDecimal;
import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
@Service
@Slf4j
@RequiredArgsConstructor
public class BankAccountMongoProjection implements Projection {
- @Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}")
- private String bankAccountTopicName;
+ private final BankAccountMongoRepository mongoRepository;
+ private final EventStoreDB eventStoreDB;
@KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"},
@@ -38,7 +44,7 @@ public class BankAccountMongoProjection implements Projection {
try {
final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data);
- Arrays.stream(events).toList().forEach(this::when);
+ this.processEvents(Arrays.stream(events).toList());
ack.acknowledge();
log.info("ack events: {}", Arrays.toString(events));
} catch (Exception e) {
@@ -48,6 +54,20 @@ public class BankAccountMongoProjection implements Projection {
}
}
+ private void processEvents(List events) {
+ if (events.isEmpty()) return;
+
+ try {
+ events.forEach(this::when);
+ } catch (Exception ex) {
+ mongoRepository.deleteByAggregateId(events.get(0).getAggregateId());
+ final var aggregate = eventStoreDB.load(events.get(0).getAggregateId(), BankAccountAggregate.class);
+ final var document = BankAccountMapper.bankAccountDocumentFromAggregate(aggregate);
+ final var result = mongoRepository.save(document);
+ log.info("(processEvents) saved document: {}", result);
+ }
+ }
+
@Override
public void when(Event event) {
final var aggregateId = event.getAggregateId();
@@ -62,7 +82,7 @@ public class BankAccountMongoProjection implements Projection {
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class));
case BalanceDepositedEvent.BALANCE_DEPOSITED ->
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class));
- default -> throw new UnknownEventTypeException(event.getEventType());
+ default -> log.error("unknown event type: {}", event.getEventType());
}
}
@@ -70,25 +90,50 @@ public class BankAccountMongoProjection implements Projection {
private void handle(BankAccountCreatedEvent event) {
log.info("(when) BankAccountCreatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
-// final var document = BankAccountDocument.builder()
-// .aggregateId(event.getAggregateId())
-// .email(event.getEmail())
-// .address(event.getAddress())
-// .userName(event.getUserName())
-// .balance(BigDecimal.valueOf(0))
-// .build();
+ final var document = BankAccountDocument.builder()
+ .aggregateId(event.getAggregateId())
+ .email(event.getEmail())
+ .address(event.getAddress())
+ .userName(event.getUserName())
+ .balance(BigDecimal.valueOf(0))
+ .build();
+
+ final var insert = mongoRepository.insert(document);
+ log.info("(BankAccountCreatedEvent) insert: {}", insert);
}
private void handle(EmailChangedEvent event) {
log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId());
+ Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
+ if (documentOptional.isEmpty())
+ throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
+
+ final var document = documentOptional.get();
+ document.setEmail(event.getNewEmail());
+ mongoRepository.save(document);
}
private void handle(AddressUpdatedEvent event) {
log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
+ Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
+ if (documentOptional.isEmpty())
+ throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
+
+ final var document = documentOptional.get();
+ document.setAddress(event.getNewAddress());
+ mongoRepository.save(document);
}
private void handle(BalanceDepositedEvent event) {
log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId());
+ Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
+ if (documentOptional.isEmpty())
+ throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
+ final var document = documentOptional.get();
+ final var balance = document.getBalance();
+ final var newBalance = balance.add(event.getAmount());
+ document.setBalance(newBalance);
+ mongoRepository.save(document);
}
}
diff --git a/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java b/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java
index 76ddd6e..13ffbcc 100644
--- a/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java
+++ b/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java
@@ -1,13 +1,17 @@
package com.eventsourcing.bankAccount.queries;
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
+import com.eventsourcing.bankAccount.domain.BankAccountDocument;
import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO;
+import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository;
import com.eventsourcing.es.EventStoreDB;
import com.eventsourcing.mappers.BankAccountMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
+import java.util.Optional;
+
@Slf4j
@RequiredArgsConstructor
@@ -15,10 +19,19 @@ import org.springframework.stereotype.Service;
public class BankAccountQueryHandler implements BankAccountQueryService {
private final EventStoreDB eventStoreDB;
+ private final BankAccountMongoRepository mongoRepository;
@Override
public BankAccountResponseDTO handle(GetBankAccountByIDQuery query) {
+ Optional optionalDocument = mongoRepository.findByAggregateId(query.aggregateID());
+ if (optionalDocument.isPresent()) {
+ return BankAccountMapper.bankAccountResponseDTOFromDocument(optionalDocument.get());
+ }
+
final var aggregate = eventStoreDB.load(query.aggregateID(), BankAccountAggregate.class);
+ BankAccountDocument savedDocument = mongoRepository.save(BankAccountMapper.bankAccountDocumentFromAggregate(aggregate));
+ log.info("(GetBankAccountByIDQuery) savedDocument: {}", savedDocument);
+
final var bankAccountResponseDTO = BankAccountMapper.bankAccountResponseDTOFromAggregate(aggregate);
log.info("(GetBankAccountByIDQuery) response: {}", bankAccountResponseDTO);
return bankAccountResponseDTO;
diff --git a/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java b/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java
new file mode 100644
index 0000000..5fa5e4e
--- /dev/null
+++ b/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java
@@ -0,0 +1,13 @@
+package com.eventsourcing.bankAccount.repository;
+
+import com.eventsourcing.bankAccount.domain.BankAccountDocument;
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+import java.util.Optional;
+
+public interface BankAccountMongoRepository extends MongoRepository {
+
+ Optional findByAggregateId(String aggregateId);
+
+ void deleteByAggregateId(String aggregateId);
+}
diff --git a/src/main/java/com/eventsourcing/configuration/MongoConfiguration.java b/src/main/java/com/eventsourcing/configuration/MongoConfiguration.java
new file mode 100644
index 0000000..2577143
--- /dev/null
+++ b/src/main/java/com/eventsourcing/configuration/MongoConfiguration.java
@@ -0,0 +1,27 @@
+package com.eventsourcing.configuration;
+
+import com.eventsourcing.bankAccount.domain.BankAccountDocument;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.domain.Sort;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.data.mongodb.core.index.Index;
+
+import javax.annotation.PostConstruct;
+
+@Configuration
+@Slf4j
+@RequiredArgsConstructor
+public class MongoConfiguration {
+
+ private final MongoTemplate mongoTemplate;
+
+ @PostConstruct
+ public void mongoInit() {
+ final var bankAccounts = mongoTemplate.getCollection("bankAccounts");
+ final var aggregateIdIndex = mongoTemplate.indexOps(BankAccountDocument.class).ensureIndex(new Index("aggregateId", Sort.Direction.ASC).unique());
+ final var indexInfo = mongoTemplate.indexOps(BankAccountDocument.class).getIndexInfo();
+ log.info("MongoDB connected, bankAccounts aggregateId index created: {}", indexInfo);
+ }
+}
diff --git a/src/main/java/com/eventsourcing/mappers/BankAccountMapper.java b/src/main/java/com/eventsourcing/mappers/BankAccountMapper.java
index 55c5eb9..507ab9d 100644
--- a/src/main/java/com/eventsourcing/mappers/BankAccountMapper.java
+++ b/src/main/java/com/eventsourcing/mappers/BankAccountMapper.java
@@ -1,6 +1,7 @@
package com.eventsourcing.mappers;
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
+import com.eventsourcing.bankAccount.domain.BankAccountDocument;
import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO;
public final class BankAccountMapper {
@@ -19,23 +20,23 @@ public final class BankAccountMapper {
);
}
-// public static BankAccountResponseDTO bankAccountResponseDTOFromDocument(BankAccountDocument bankAccountDocument) {
-// return new BankAccountResponseDTO(
-// bankAccountDocument.getAggregateId(),
-// bankAccountDocument.getEmail(),
-// bankAccountDocument.getAddress(),
-// bankAccountDocument.getUserName(),
-// bankAccountDocument.getBalance()
-// );
-// }
-//
-// public static BankAccountDocument bankAccountDocumentFromAggregate(BankAccountAggregate bankAccountAggregate) {
-// return BankAccountDocument.builder()
-// .aggregateId(bankAccountAggregate.getId())
-// .email(bankAccountAggregate.getEmail())
-// .address(bankAccountAggregate.getAddress())
-// .userName(bankAccountAggregate.getUserName())
-// .balance(bankAccountAggregate.getBalance())
-// .build();
-// }
+ public static BankAccountResponseDTO bankAccountResponseDTOFromDocument(BankAccountDocument bankAccountDocument) {
+ return new BankAccountResponseDTO(
+ bankAccountDocument.getAggregateId(),
+ bankAccountDocument.getEmail(),
+ bankAccountDocument.getAddress(),
+ bankAccountDocument.getUserName(),
+ bankAccountDocument.getBalance()
+ );
+ }
+
+ public static BankAccountDocument bankAccountDocumentFromAggregate(BankAccountAggregate bankAccountAggregate) {
+ return BankAccountDocument.builder()
+ .aggregateId(bankAccountAggregate.getId())
+ .email(bankAccountAggregate.getEmail())
+ .address(bankAccountAggregate.getAddress())
+ .userName(bankAccountAggregate.getUserName())
+ .balance(bankAccountAggregate.getBalance())
+ .build();
+ }
}
\ No newline at end of file
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index e2f4287..8741636 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -27,4 +27,12 @@ logging.level.org.apache.kafka=warn
microservice.kafka.topics.bank-account-event-store=bank-account-event-store
microservice.kafka.groupId=es_microservice
-microservice.kafka.default-concurrency=10
\ No newline at end of file
+microservice.kafka.default-concurrency=10
+
+
+spring.data.mongodb.host=localhost
+spring.data.mongodb.port=27017
+spring.data.mongodb.authentication-database=admin
+spring.data.mongodb.username=admin
+spring.data.mongodb.password=admin
+spring.data.mongodb.database=microservices