17 Commits

Author SHA1 Message Date
Alexander
5275894580 feature: add jaeger opentracing improvements 2022-04-14 15:30:08 +03:00
Alexander
ea25e8e8bc feature: add jaeger opentracing 2022-04-14 15:18:33 +03:00
Alexander
501b5a9cec feature: add prometheus monitoring 2022-04-14 15:04:44 +03:00
Alexander
a87fde57fe feature: add swagger openapi 2022-04-14 14:52:32 +03:00
Alexander
ff088e891f Merge pull request #2 from AleksK1NG/feature/mongo
Feature/mongo
2022-04-14 14:44:40 +03:00
Alexander
5f9dee9905 Merge remote-tracking branch 'origin/feature/mongo' into feature/mongo 2022-04-14 14:44:01 +03:00
Alexander
3e642c079c feature: add bank account mongo projection improvements update 2022-04-14 14:43:42 +03:00
Alexander
3f6d872444 feature: add bank account mongo projection improvements 2022-04-14 14:43:42 +03:00
Alexander
747d6da0b3 feature: add bank account mongo repository 2022-04-14 14:43:42 +03:00
Alexander
920ce91496 feature: add bank account mongo projection improvements update 2022-04-14 14:42:24 +03:00
Alexander
c513bbfc39 feature: add bank account mongo projection improvements 2022-04-14 12:08:44 +03:00
Alexander
165cd1ebee feature: add bank account mongo repository 2022-04-14 11:59:35 +03:00
Alexander
92df832b48 feature: add bank account request dto validation 2022-04-14 10:48:52 +03:00
Alexander
07282ade74 feature: add bank account mongo projection handle events methods 2022-04-14 10:48:52 +03:00
Alexander
406694ca38 feature: add bank account mongo projection 2022-04-14 10:48:52 +03:00
Alexander
15ca094c01 feature: add kafka event bus 2022-04-14 10:48:52 +03:00
Alexander
3b17fed06c feature: add kafka configuration 2022-04-14 10:48:52 +03:00
12 changed files with 277 additions and 55 deletions

24
pom.xml
View File

@@ -17,6 +17,30 @@
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth-zipkin</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.8.4</version>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-ui</artifactId>
<version>1.6.7</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>

View File

@@ -5,17 +5,20 @@ import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
import com.eventsourcing.es.EventStoreDB;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.cloud.sleuth.annotation.SpanTag;
import org.springframework.stereotype.Service;
@RequiredArgsConstructor
@Slf4j
@Service
public class BankAccountCommandHandler implements BankAccountCommandService{
public class BankAccountCommandHandler implements BankAccountCommandService {
private final EventStoreDB eventStoreDB;
@Override
public String handle(CreateBankAccountCommand command) {
@NewSpan
public String handle(@SpanTag("command") CreateBankAccountCommand command) {
final var aggregate = new BankAccountAggregate(command.aggregateID());
aggregate.createBankAccount(command.email(), command.address(), command.userName());
eventStoreDB.save(aggregate);
@@ -25,7 +28,8 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
}
@Override
public void handle(ChangeEmailCommand command) {
@NewSpan
public void handle(@SpanTag("command") ChangeEmailCommand command) {
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
aggregate.changeEmail(command.newEmail());
eventStoreDB.save(aggregate);
@@ -33,7 +37,8 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
}
@Override
public void handle(ChangeAddressCommand command) {
@NewSpan
public void handle(@SpanTag("command") ChangeAddressCommand command) {
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
aggregate.changeAddress(command.newAddress());
eventStoreDB.save(aggregate);
@@ -41,7 +46,8 @@ public class BankAccountCommandHandler implements BankAccountCommandService{
}
@Override
public void handle(DepositAmountCommand command) {
@NewSpan
public void handle(@SpanTag("command") DepositAmountCommand command) {
final var aggregate = eventStoreDB.load(command.aggregateID(), BankAccountAggregate.class);
aggregate.depositBalance(command.amount());
eventStoreDB.save(aggregate);

View File

@@ -0,0 +1,36 @@
package com.eventsourcing.bankAccount.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.bson.codecs.pojo.annotations.BsonProperty;
import org.springframework.data.mongodb.core.mapping.Document;
import java.math.BigDecimal;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Document(collection = "bankAccounts")
public class BankAccountDocument {
@BsonProperty(value = "_id")
private String id;
@BsonProperty(value = "aggregateId")
private String aggregateId;
@BsonProperty(value = "email")
private String email;
@BsonProperty(value = "userName")
private String userName;
@BsonProperty(value = "address")
private String address;
@BsonProperty(value = "balance")
private BigDecimal balance;
}

View File

@@ -1,32 +1,40 @@
package com.eventsourcing.bankAccount.projection;
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
import com.eventsourcing.bankAccount.domain.BankAccountDocument;
import com.eventsourcing.bankAccount.events.AddressUpdatedEvent;
import com.eventsourcing.bankAccount.events.BalanceDepositedEvent;
import com.eventsourcing.bankAccount.events.BankAccountCreatedEvent;
import com.eventsourcing.bankAccount.events.EmailChangedEvent;
import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository;
import com.eventsourcing.es.Event;
import com.eventsourcing.es.EventStoreDB;
import com.eventsourcing.es.Projection;
import com.eventsourcing.es.SerializerUtils;
import com.eventsourcing.exceptions.UnknownEventTypeException;
import com.eventsourcing.mappers.BankAccountMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.cloud.sleuth.annotation.SpanTag;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@Service
@Slf4j
@RequiredArgsConstructor
public class BankAccountMongoProjection implements Projection {
@Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}")
private String bankAccountTopicName;
private final BankAccountMongoRepository mongoRepository;
private final EventStoreDB eventStoreDB;
@KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"},
@@ -38,7 +46,7 @@ public class BankAccountMongoProjection implements Projection {
try {
final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data);
Arrays.stream(events).toList().forEach(this::when);
this.processEvents(Arrays.stream(events).toList());
ack.acknowledge();
log.info("ack events: {}", Arrays.toString(events));
} catch (Exception e) {
@@ -48,8 +56,24 @@ public class BankAccountMongoProjection implements Projection {
}
}
@NewSpan
private void processEvents(@SpanTag("events") List<Event> events) {
if (events.isEmpty()) return;
try {
events.forEach(this::when);
} catch (Exception ex) {
mongoRepository.deleteByAggregateId(events.get(0).getAggregateId());
final var aggregate = eventStoreDB.load(events.get(0).getAggregateId(), BankAccountAggregate.class);
final var document = BankAccountMapper.bankAccountDocumentFromAggregate(aggregate);
final var result = mongoRepository.save(document);
log.info("(processEvents) saved document: {}", result);
}
}
@Override
public void when(Event event) {
@NewSpan
public void when(@SpanTag("event") Event event) {
final var aggregateId = event.getAggregateId();
log.info("(when) >>>>> aggregateId: {}", aggregateId);
@@ -62,33 +86,62 @@ public class BankAccountMongoProjection implements Projection {
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class));
case BalanceDepositedEvent.BALANCE_DEPOSITED ->
handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class));
default -> throw new UnknownEventTypeException(event.getEventType());
default -> log.error("unknown event type: {}", event.getEventType());
}
}
private void handle(BankAccountCreatedEvent event) {
@NewSpan
private void handle(@SpanTag("event") BankAccountCreatedEvent event) {
log.info("(when) BankAccountCreatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
// final var document = BankAccountDocument.builder()
// .aggregateId(event.getAggregateId())
// .email(event.getEmail())
// .address(event.getAddress())
// .userName(event.getUserName())
// .balance(BigDecimal.valueOf(0))
// .build();
final var document = BankAccountDocument.builder()
.aggregateId(event.getAggregateId())
.email(event.getEmail())
.address(event.getAddress())
.userName(event.getUserName())
.balance(BigDecimal.valueOf(0))
.build();
final var insert = mongoRepository.insert(document);
log.info("(BankAccountCreatedEvent) insert: {}", insert);
}
private void handle(EmailChangedEvent event) {
@NewSpan
private void handle(@SpanTag("event") EmailChangedEvent event) {
log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId());
Optional<BankAccountDocument> documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
if (documentOptional.isEmpty())
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
final var document = documentOptional.get();
document.setEmail(event.getNewEmail());
mongoRepository.save(document);
}
private void handle(AddressUpdatedEvent event) {
@NewSpan
private void handle(@SpanTag("event") AddressUpdatedEvent event) {
log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId());
Optional<BankAccountDocument> documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
if (documentOptional.isEmpty())
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
final var document = documentOptional.get();
document.setAddress(event.getNewAddress());
mongoRepository.save(document);
}
private void handle(BalanceDepositedEvent event) {
@NewSpan
private void handle(@SpanTag("event") BalanceDepositedEvent event) {
log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId());
Optional<BankAccountDocument> documentOptional = mongoRepository.findByAggregateId(event.getAggregateId());
if (documentOptional.isEmpty())
throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId());
final var document = documentOptional.get();
final var balance = document.getBalance();
final var newBalance = balance.add(event.getAmount());
document.setBalance(newBalance);
mongoRepository.save(document);
}
}

View File

@@ -1,13 +1,19 @@
package com.eventsourcing.bankAccount.queries;
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
import com.eventsourcing.bankAccount.domain.BankAccountDocument;
import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO;
import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository;
import com.eventsourcing.es.EventStoreDB;
import com.eventsourcing.mappers.BankAccountMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.cloud.sleuth.annotation.SpanTag;
import org.springframework.stereotype.Service;
import java.util.Optional;
@Slf4j
@RequiredArgsConstructor
@@ -15,10 +21,20 @@ import org.springframework.stereotype.Service;
public class BankAccountQueryHandler implements BankAccountQueryService {
private final EventStoreDB eventStoreDB;
private final BankAccountMongoRepository mongoRepository;
@Override
public BankAccountResponseDTO handle(GetBankAccountByIDQuery query) {
@NewSpan
public BankAccountResponseDTO handle(@SpanTag("query") GetBankAccountByIDQuery query) {
Optional<BankAccountDocument> optionalDocument = mongoRepository.findByAggregateId(query.aggregateID());
if (optionalDocument.isPresent()) {
return BankAccountMapper.bankAccountResponseDTOFromDocument(optionalDocument.get());
}
final var aggregate = eventStoreDB.load(query.aggregateID(), BankAccountAggregate.class);
BankAccountDocument savedDocument = mongoRepository.save(BankAccountMapper.bankAccountDocumentFromAggregate(aggregate));
log.info("(GetBankAccountByIDQuery) savedDocument: {}", savedDocument);
final var bankAccountResponseDTO = BankAccountMapper.bankAccountResponseDTOFromAggregate(aggregate);
log.info("(GetBankAccountByIDQuery) response: {}", bankAccountResponseDTO);
return bankAccountResponseDTO;

View File

@@ -0,0 +1,13 @@
package com.eventsourcing.bankAccount.repository;
import com.eventsourcing.bankAccount.domain.BankAccountDocument;
import org.springframework.data.mongodb.repository.MongoRepository;
import java.util.Optional;
public interface BankAccountMongoRepository extends MongoRepository<BankAccountDocument, String> {
Optional<BankAccountDocument> findByAggregateId(String aggregateId);
void deleteByAggregateId(String aggregateId);
}

View File

@@ -0,0 +1,27 @@
package com.eventsourcing.configuration;
import com.eventsourcing.bankAccount.domain.BankAccountDocument;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.index.Index;
import javax.annotation.PostConstruct;
@Configuration
@Slf4j
@RequiredArgsConstructor
public class MongoConfiguration {
private final MongoTemplate mongoTemplate;
@PostConstruct
public void mongoInit() {
final var bankAccounts = mongoTemplate.getCollection("bankAccounts");
final var aggregateIdIndex = mongoTemplate.indexOps(BankAccountDocument.class).ensureIndex(new Index("aggregateId", Sort.Direction.ASC).unique());
final var indexInfo = mongoTemplate.indexOps(BankAccountDocument.class).getIndexInfo();
log.info("MongoDB connected, bankAccounts aggregateId index created: {}", indexInfo);
}
}

View File

@@ -0,0 +1,15 @@
package com.eventsourcing.configuration;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Contact;
import io.swagger.v3.oas.annotations.info.Info;
import org.springframework.stereotype.Component;
@OpenAPIDefinition(info = @Info(title = "Spring CQRS and Event Sourcing Microservice",
description = "Spring Postgresql MongoDB Kafka CQRS and Event Sourcing Microservice",
contact = @Contact(name = "Alexander Bryksin", email = "alexander.bryksin@yandex.ru", url = "https://github.com/AleksK1NG")))
@Component
public class SwaggerOpenAPIConfiguration {
}

View File

@@ -4,6 +4,8 @@ package com.eventsourcing.es;
import com.eventsourcing.es.exceptions.AggregateNotFoundException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.cloud.sleuth.annotation.SpanTag;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Repository;
@@ -28,7 +30,8 @@ public class EventStore implements EventStoreDB {
private final EventBus eventBus;
@Override
public void saveEvents(List<Event> events) {
@NewSpan
public void saveEvents(@SpanTag("events") List<Event> events) {
if (events.isEmpty()) return;
final List<Event> changes = new ArrayList<>(events);
@@ -46,7 +49,8 @@ public class EventStore implements EventStoreDB {
}
@Override
public List<Event> loadEvents(String aggregateId, long version) {
@NewSpan
public List<Event> loadEvents(@SpanTag("aggregateId") String aggregateId, @SpanTag("version") long version) {
final List<Event> events = jdbcTemplate.query(LOAD_EVENTS_QUERY, Map.of("aggregate_id", aggregateId, "version", version),
(rs, rowNum) -> Event.builder()
.aggregateId(rs.getString("aggregate_id"))
@@ -62,7 +66,8 @@ public class EventStore implements EventStoreDB {
return events;
}
private <T extends AggregateRoot> void saveSnapshot(T aggregate) {
@NewSpan
private <T extends AggregateRoot> void saveSnapshot(@SpanTag("aggregate") T aggregate) {
aggregate.toSnapshot();
final var snapshot = EventSourcingUtils.snapshotFromAggregate(aggregate);
@@ -78,7 +83,8 @@ public class EventStore implements EventStoreDB {
@Override
@Transactional
public <T extends AggregateRoot> void save(T aggregate) {
@NewSpan
public <T extends AggregateRoot> void save(@SpanTag("aggregate") T aggregate) {
final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());
if (aggregate.getVersion() > 1) {
@@ -95,7 +101,8 @@ public class EventStore implements EventStoreDB {
log.info("(save) saved aggregate: {}", aggregate);
}
private void handleConcurrency(String aggregateId) {
@NewSpan
private void handleConcurrency(@SpanTag("aggregateId") String aggregateId) {
try {
String aggregateID = jdbcTemplate.queryForObject(HANDLE_CONCURRENCY_QUERY, Map.of("aggregate_id", aggregateId), String.class);
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateID);
@@ -105,7 +112,8 @@ public class EventStore implements EventStoreDB {
log.info("(handleConcurrency) aggregateID for lock: {}", aggregateId);
}
private Optional<Snapshot> loadSnapshot(String aggregateId) {
@NewSpan
private Optional<Snapshot> loadSnapshot(@SpanTag("aggregateId") String aggregateId) {
final Optional<Snapshot> snapshot = jdbcTemplate.query(LOAD_SNAPSHOT_QUERY, Map.of("aggregate_id", aggregateId), (rs, rowNum) -> Snapshot.builder()
.aggregateId(rs.getString("aggregate_id"))
.aggregateType(rs.getString("aggregate_type"))
@@ -119,7 +127,8 @@ public class EventStore implements EventStoreDB {
return snapshot;
}
private <T extends AggregateRoot> T getAggregate(final String aggregateId, final Class<T> aggregateType) {
@NewSpan
private <T extends AggregateRoot> T getAggregate(@SpanTag("aggregateId") final String aggregateId, @SpanTag("aggregateType") final Class<T> aggregateType) {
try {
return aggregateType.getConstructor(String.class).newInstance(aggregateId);
} catch (Exception ex) {
@@ -127,7 +136,8 @@ public class EventStore implements EventStoreDB {
}
}
private <T extends AggregateRoot> T getSnapshotFromClass(Optional<Snapshot> snapshot, String aggregateId, Class<T> aggregateType) {
@NewSpan
private <T extends AggregateRoot> T getSnapshotFromClass(@SpanTag("snapshot") Optional<Snapshot> snapshot, @SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
if (snapshot.isEmpty()) {
final var defaultSnapshot = EventSourcingUtils.snapshotFromAggregate(getAggregate(aggregateId, aggregateType));
return EventSourcingUtils.aggregateFromSnapshot(defaultSnapshot, aggregateType);
@@ -137,7 +147,8 @@ public class EventStore implements EventStoreDB {
@Override
@Transactional(readOnly = true)
public <T extends AggregateRoot> T load(String aggregateId, Class<T> aggregateType) {
@NewSpan
public <T extends AggregateRoot> T load(@SpanTag("aggregateId") String aggregateId, @SpanTag("aggregateType") Class<T> aggregateType) {
final Optional<Snapshot> snapshot = this.loadSnapshot(aggregateId);
@@ -156,7 +167,8 @@ public class EventStore implements EventStoreDB {
}
@Override
public Boolean exists(String aggregateId) {
@NewSpan
public Boolean exists(@SpanTag("aggregateId") String aggregateId) {
try {
final var id = jdbcTemplate.queryForObject(EXISTS_QUERY, Map.of("aggregate_id", aggregateId), String.class);
log.info("aggregate exists id: {}", id);

View File

@@ -5,6 +5,8 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.cloud.sleuth.annotation.SpanTag;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@@ -21,7 +23,8 @@ public class KafkaEventBus implements EventBus {
private String bankAccountTopicName;
@Override
public void publish(List<Event> events) {
@NewSpan
public void publish(@SpanTag("events") List<Event> events) {
final byte[] eventsBytes = SerializerUtils.serializeToJsonBytes(events.toArray(new Event[]{}));
final ProducerRecord<String, byte[]> record = new ProducerRecord<>(bankAccountTopicName, eventsBytes);

View File

@@ -1,6 +1,7 @@
package com.eventsourcing.mappers;
import com.eventsourcing.bankAccount.domain.BankAccountAggregate;
import com.eventsourcing.bankAccount.domain.BankAccountDocument;
import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO;
public final class BankAccountMapper {
@@ -19,23 +20,23 @@ public final class BankAccountMapper {
);
}
// public static BankAccountResponseDTO bankAccountResponseDTOFromDocument(BankAccountDocument bankAccountDocument) {
// return new BankAccountResponseDTO(
// bankAccountDocument.getAggregateId(),
// bankAccountDocument.getEmail(),
// bankAccountDocument.getAddress(),
// bankAccountDocument.getUserName(),
// bankAccountDocument.getBalance()
// );
// }
//
// public static BankAccountDocument bankAccountDocumentFromAggregate(BankAccountAggregate bankAccountAggregate) {
// return BankAccountDocument.builder()
// .aggregateId(bankAccountAggregate.getId())
// .email(bankAccountAggregate.getEmail())
// .address(bankAccountAggregate.getAddress())
// .userName(bankAccountAggregate.getUserName())
// .balance(bankAccountAggregate.getBalance())
// .build();
// }
public static BankAccountResponseDTO bankAccountResponseDTOFromDocument(BankAccountDocument bankAccountDocument) {
return new BankAccountResponseDTO(
bankAccountDocument.getAggregateId(),
bankAccountDocument.getEmail(),
bankAccountDocument.getAddress(),
bankAccountDocument.getUserName(),
bankAccountDocument.getBalance()
);
}
public static BankAccountDocument bankAccountDocumentFromAggregate(BankAccountAggregate bankAccountAggregate) {
return BankAccountDocument.builder()
.aggregateId(bankAccountAggregate.getId())
.email(bankAccountAggregate.getEmail())
.address(bankAccountAggregate.getAddress())
.userName(bankAccountAggregate.getUserName())
.balance(bankAccountAggregate.getBalance())
.build();
}
}

View File

@@ -27,4 +27,20 @@ logging.level.org.apache.kafka=warn
microservice.kafka.topics.bank-account-event-store=bank-account-event-store
microservice.kafka.groupId=es_microservice
microservice.kafka.default-concurrency=10
microservice.kafka.default-concurrency=10
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.authentication-database=admin
spring.data.mongodb.username=admin
spring.data.mongodb.password=admin
spring.data.mongodb.database=microservices
springdoc.swagger-ui.path=/swagger-ui.html
management.endpoints.web.exposure.include=health,prometheus,info
spring.sleuth.propagation.type=w3c,b3
spring.sleuth.opentracing.enabled=true
spring.zipkin.base-url=http://localhost:9411