From 35b7348234fc48c89424179639dabacac4588b9b Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 14 Apr 2022 08:48:48 +0300 Subject: [PATCH 01/11] feature: add kafka configuration --- monitoring/prometheus.yml | 17 ++++++ pom.xml | 8 +++ .../configuration/KafkaConfigProperties.java | 26 +++++++++ .../configuration/KafkaConsumerConfig.java | 56 +++++++++++++++++++ .../configuration/KafkaProducerConfig.java | 48 ++++++++++++++++ .../KafkaTopicConfiguration.java | 46 +++++++++++++++ src/main/resources/application.properties | 14 +++++ 7 files changed, 215 insertions(+) create mode 100644 monitoring/prometheus.yml create mode 100644 src/main/java/com/eventsourcing/configuration/KafkaConfigProperties.java create mode 100644 src/main/java/com/eventsourcing/configuration/KafkaConsumerConfig.java create mode 100644 src/main/java/com/eventsourcing/configuration/KafkaProducerConfig.java create mode 100644 src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java diff --git a/monitoring/prometheus.yml b/monitoring/prometheus.yml new file mode 100644 index 0000000..45907bc --- /dev/null +++ b/monitoring/prometheus.yml @@ -0,0 +1,17 @@ +global: + scrape_interval: 10s + evaluation_interval: 10s + +scrape_configs: + - job_name: 'prometheus' + static_configs: + - targets: [ 'localhost:9090' ] + + - job_name: 'system' + static_configs: + - targets: [ 'host.docker.internal:9101' ] + + - job_name: 'microservice' + metrics_path: '/actuator/prometheus' + static_configs: + - targets: [ 'host.docker.internal:8006' ] \ No newline at end of file diff --git a/pom.xml b/pom.xml index c988535..ed323ee 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,14 @@ 17 + + org.springframework.boot + spring-boot-starter-validation + + + org.springframework.kafka + spring-kafka + org.springframework.boot spring-boot-starter-actuator diff --git a/src/main/java/com/eventsourcing/configuration/KafkaConfigProperties.java b/src/main/java/com/eventsourcing/configuration/KafkaConfigProperties.java new file mode 100644 index 0000000..142ff09 --- /dev/null +++ b/src/main/java/com/eventsourcing/configuration/KafkaConfigProperties.java @@ -0,0 +1,26 @@ +package com.eventsourcing.configuration; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConfigurationProperties(prefix = "kafka") +@Data +@AllArgsConstructor +@RequiredArgsConstructor +public class KafkaConfigProperties { + private String acks = "0"; + private String compressionType = "snappy"; + private int retries = 3; + private int deliveryTimeoutMs = 120000; + private int maxRequestSize = 1068576; + private int requestTimeoutMs = 30000; + private String orderMongoProjectionGroupId = "projection-group"; + private String enableAutoCommit = "false"; + private int consumerParallelism = 16; + private int eventsTopicPartitions = 6; + private int eventsTopicReplicationFactor = 1; +} diff --git a/src/main/java/com/eventsourcing/configuration/KafkaConsumerConfig.java b/src/main/java/com/eventsourcing/configuration/KafkaConsumerConfig.java new file mode 100644 index 0000000..d0bb369 --- /dev/null +++ b/src/main/java/com/eventsourcing/configuration/KafkaConsumerConfig.java @@ -0,0 +1,56 @@ +package com.eventsourcing.configuration; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.util.backoff.FixedBackOff; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +@RequiredArgsConstructor +@EnableKafka +public class KafkaConsumerConfig { + + @Value(value = "${kafka.bootstrapServers:localhost:9093}") + private String bootstrapServers; + + private final KafkaConfigProperties kafkaConfigProperties; + + @Bean + public ConcurrentKafkaListenerContainerFactory + kafkaListenerContainerFactory(ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(Runtime.getRuntime().availableProcessors()); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L))); + return factory; + } + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerProps()); + } + + private Map consumerProps() { + final Map consumerProps = new HashMap<>(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfigProperties.getOrderMongoProjectionGroupId()); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfigProperties.getEnableAutoCommit()); + return consumerProps; + } +} diff --git a/src/main/java/com/eventsourcing/configuration/KafkaProducerConfig.java b/src/main/java/com/eventsourcing/configuration/KafkaProducerConfig.java new file mode 100644 index 0000000..d1e3eac --- /dev/null +++ b/src/main/java/com/eventsourcing/configuration/KafkaProducerConfig.java @@ -0,0 +1,48 @@ +package com.eventsourcing.configuration; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +@RequiredArgsConstructor +public class KafkaProducerConfig { + @Value(value = "${kafka.bootstrapServers:localhost:9093}") + private String bootstrapServers; + + private final KafkaConfigProperties kafkaConfigProperties; + + private Map senderProps() { + Map producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producerProps.put(ProducerConfig.ACKS_CONFIG, kafkaConfigProperties.getAcks()); +// producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, kafkaConfigProperties.getCompressionType()); + producerProps.put(ProducerConfig.RETRIES_CONFIG, kafkaConfigProperties.getRetries()); + producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaConfigProperties.getDeliveryTimeoutMs()); + producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaConfigProperties.getMaxRequestSize()); + producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaConfigProperties.getRequestTimeoutMs()); + return producerProps; + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(senderProps()); + } + + @Bean + public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { + return new KafkaTemplate<>(producerFactory); + } +} \ No newline at end of file diff --git a/src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java b/src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java new file mode 100644 index 0000000..84d98ab --- /dev/null +++ b/src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java @@ -0,0 +1,46 @@ +package com.eventsourcing.configuration; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaAdmin; + +import java.util.HashMap; +import java.util.Map; + + +@Configuration +@Slf4j +@RequiredArgsConstructor +public class KafkaTopicConfiguration { + + @Value(value = "${kafka.bootstrapServers:localhost:9093}") + private String bootstrapServers; + + @Value(value = "${order.kafka.topics.bank-account-event-store:bank-account-event-store}") + private String bankAccountTopicName; + + @Bean + public KafkaAdmin kafkaAdmin() { + final Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + return new KafkaAdmin(configs); + } + + @Bean + public NewTopic bankAccountEventStoreTopicInitializer(KafkaAdmin kafkaAdmin) { + try { + final var topic = new NewTopic(bankAccountTopicName, 3, (short) 1); + kafkaAdmin.createOrModifyTopics(topic); + log.info("(bankAccountEventStoreTopicInitializer) topic: {}", topic); + return topic; + } catch (Exception e) { + log.error(e.getMessage()); + return null; + } + } +} \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 23a8695..60a3792 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -2,6 +2,8 @@ spring.application.name=microservice server.port=8006 server.tomcat.accesslog.enabled=true +logging.pattern.console=%clr(%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(line:%L) %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx} + spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.PostgreSQLDialect spring.jpa.hibernate.ddl-auto=none @@ -14,3 +16,15 @@ spring.jpa.generate-ddl=false spring.datasource.hikari.minimum-idle=10 spring.datasource.hikari.maximum-pool-size=30 + + +spring.kafka.bootstrap-servers=localhost:9093 +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.StringOrBytesSerializer +spring.kafka.consumer.group-id=microservice + +logging.level.org.apache.kafka=warn + +order.kafka.topics.bank-account-event-store=bank-account-event-store +order.kafka.groupId=es_microservice +order.kafka.default-concurrency=10 \ No newline at end of file From 0e1f096542db62e29e0becb35b81adb913df28ad Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 14 Apr 2022 09:56:46 +0300 Subject: [PATCH 02/11] feature: add kafka event bus --- pom.xml | 4 ++ .../BankAccountMongoProjection.java | 41 ++++++++++++++++++ .../KafkaTopicConfiguration.java | 2 +- .../java/com/eventsourcing/es/EventStore.java | 5 +++ .../com/eventsourcing/es/KafkaEventBus.java | 37 ++++++++++++++++ src/main/resources/application.properties | 6 +-- .../db/migration/V1__initial_setup.sql | 43 +++++++++++++++++++ 7 files changed, 134 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java create mode 100644 src/main/java/com/eventsourcing/es/KafkaEventBus.java create mode 100644 src/main/resources/db/migration/V1__initial_setup.sql diff --git a/pom.xml b/pom.xml index ed323ee..81db9a2 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,10 @@ 17 + + org.flywaydb + flyway-core + org.springframework.boot spring-boot-starter-validation diff --git a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java new file mode 100644 index 0000000..98d9f06 --- /dev/null +++ b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java @@ -0,0 +1,41 @@ +package com.eventsourcing.bankAccount.projection; + + +import com.eventsourcing.es.Event; +import com.eventsourcing.es.SerializerUtils; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Service; + +import java.util.Arrays; + +@Service +@Slf4j +@RequiredArgsConstructor +public class BankAccountMongoProjection { + + @Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}") + private String bankAccountTopicName; + + + @KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"}, + groupId = "${microservice.kafka.groupId}", + concurrency = "${microservice.kafka.default-concurrency}") + public void bankAccountMongoProjectionListener(@Payload byte[] data, ConsumerRecordMetadata meta, Acknowledgment ack) { + log.info("(BankAccountMongoProjection) data: {}", new String(data)); + + try { + final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data); + ack.acknowledge(); + log.info("ack events: {}", Arrays.toString(events)); + } catch (Exception e) { + ack.nack(100); + log.error("bankAccountMongoProjectionListener: {}", e.getMessage()); + } + } +} diff --git a/src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java b/src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java index 84d98ab..6f33442 100644 --- a/src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java +++ b/src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java @@ -21,7 +21,7 @@ public class KafkaTopicConfiguration { @Value(value = "${kafka.bootstrapServers:localhost:9093}") private String bootstrapServers; - @Value(value = "${order.kafka.topics.bank-account-event-store:bank-account-event-store}") + @Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}") private String bankAccountTopicName; @Bean diff --git a/src/main/java/com/eventsourcing/es/EventStore.java b/src/main/java/com/eventsourcing/es/EventStore.java index b7c5054..a1f3f2c 100644 --- a/src/main/java/com/eventsourcing/es/EventStore.java +++ b/src/main/java/com/eventsourcing/es/EventStore.java @@ -25,6 +25,7 @@ public class EventStore implements EventStoreDB { private static final String EXISTS_QUERY = "SELECT aggregate_id FROM events WHERE e e.aggregate_id = :aggregate_id"; private final NamedParameterJdbcTemplate jdbcTemplate; + private final EventBus eventBus; @Override public void saveEvents(List events) { @@ -78,6 +79,8 @@ public class EventStore implements EventStoreDB { @Override @Transactional public void save(T aggregate) { + final List aggregateEvents = new ArrayList<>(aggregate.getChanges()); + if (aggregate.getVersion() > 1) { this.handleConcurrency(aggregate.getId()); } @@ -87,6 +90,8 @@ public class EventStore implements EventStoreDB { this.saveSnapshot(aggregate); } + eventBus.publish(aggregateEvents); + log.info("(save) saved aggregate: {}", aggregate); } diff --git a/src/main/java/com/eventsourcing/es/KafkaEventBus.java b/src/main/java/com/eventsourcing/es/KafkaEventBus.java new file mode 100644 index 0000000..68b0ce4 --- /dev/null +++ b/src/main/java/com/eventsourcing/es/KafkaEventBus.java @@ -0,0 +1,37 @@ +package com.eventsourcing.es; + + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +@Service +@Slf4j +@RequiredArgsConstructor +public class KafkaEventBus implements EventBus { + private final KafkaTemplate kafkaTemplate; + + @Value(value = "${order.kafka.topics.bank-account-event-store:bank-account-event-store}") + private String bankAccountTopicName; + + @Override + public void publish(List events) { + final byte[] eventsBytes = SerializerUtils.serializeToJsonBytes(events.toArray(new Event[]{})); + final ProducerRecord record = new ProducerRecord<>(bankAccountTopicName, eventsBytes); + + try { + kafkaTemplate.send(record).get(3000, TimeUnit.MILLISECONDS); + log.info("publishing kafka record value >>>>> {}", new String(record.value())); + + } catch (Exception ex) { + log.error("(KafkaEventBus) publish get", ex); + throw new RuntimeException(ex); + } + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 60a3792..e2f4287 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -25,6 +25,6 @@ spring.kafka.consumer.group-id=microservice logging.level.org.apache.kafka=warn -order.kafka.topics.bank-account-event-store=bank-account-event-store -order.kafka.groupId=es_microservice -order.kafka.default-concurrency=10 \ No newline at end of file +microservice.kafka.topics.bank-account-event-store=bank-account-event-store +microservice.kafka.groupId=es_microservice +microservice.kafka.default-concurrency=10 \ No newline at end of file diff --git a/src/main/resources/db/migration/V1__initial_setup.sql b/src/main/resources/db/migration/V1__initial_setup.sql new file mode 100644 index 0000000..e21ddfb --- /dev/null +++ b/src/main/resources/db/migration/V1__initial_setup.sql @@ -0,0 +1,43 @@ +CREATE EXTENSION IF NOT EXISTS citext; +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; +DROP TABLE IF EXISTS events CASCADE; +DROP TABLE IF EXISTS snapshots CASCADE; + + +CREATE TABLE IF NOT EXISTS events +( + event_id UUID DEFAULT uuid_generate_v4(), + aggregate_id VARCHAR(250) NOT NULL CHECK ( aggregate_id <> '' ), + aggregate_type VARCHAR(250) NOT NULL CHECK ( aggregate_type <> '' ), + event_type VARCHAR(250) NOT NULL CHECK ( event_type <> '' ), + data BYTEA, + metadata BYTEA, + version SERIAL NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + UNIQUE (aggregate_id, version) +) PARTITION BY HASH (aggregate_id); + +CREATE INDEX IF NOT EXISTS aggregate_id_aggregate_version_idx ON events USING btree (aggregate_id, version ASC); + +CREATE TABLE IF NOT EXISTS events_partition_hash_1 PARTITION OF events + FOR VALUES WITH (MODULUS 3, REMAINDER 0); + +CREATE TABLE IF NOT EXISTS events_partition_hash_2 PARTITION OF events + FOR VALUES WITH (MODULUS 3, REMAINDER 1); + +CREATE TABLE IF NOT EXISTS events_partition_hash_3 PARTITION OF events + FOR VALUES WITH (MODULUS 3, REMAINDER 2); + +CREATE TABLE IF NOT EXISTS snapshots +( + snapshot_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + aggregate_id VARCHAR(250) UNIQUE NOT NULL CHECK ( aggregate_id <> '' ), + aggregate_type VARCHAR(250) NOT NULL CHECK ( aggregate_type <> '' ), + data BYTEA, + metadata BYTEA, + version SERIAL NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + UNIQUE (aggregate_id) +); + +CREATE INDEX IF NOT EXISTS aggregate_id_aggregate_version_idx ON snapshots USING btree (aggregate_id, version); \ No newline at end of file From fbab3a0f230522d790becfae4c76e95b03359c81 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 14 Apr 2022 10:20:51 +0300 Subject: [PATCH 03/11] feature: add bank account mongo projection --- .../bankAccount/projection/BankAccountMongoProjection.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java index 98d9f06..811544d 100644 --- a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java +++ b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java @@ -27,6 +27,7 @@ public class BankAccountMongoProjection { groupId = "${microservice.kafka.groupId}", concurrency = "${microservice.kafka.default-concurrency}") public void bankAccountMongoProjectionListener(@Payload byte[] data, ConsumerRecordMetadata meta, Acknowledgment ack) { + log.info("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp()); log.info("(BankAccountMongoProjection) data: {}", new String(data)); try { @@ -35,6 +36,7 @@ public class BankAccountMongoProjection { log.info("ack events: {}", Arrays.toString(events)); } catch (Exception e) { ack.nack(100); + log.error("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp()); log.error("bankAccountMongoProjectionListener: {}", e.getMessage()); } } From e11014dfb7d009d693bf1b06d77e01ed456b73bc Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 14 Apr 2022 10:33:21 +0300 Subject: [PATCH 04/11] feature: add bank account mongo projection handle events methods --- .../BankAccountMongoProjection.java | 53 ++++++++++++++++++- .../exceptions/UnknownEventTypeException.java | 10 ++++ 2 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/eventsourcing/exceptions/UnknownEventTypeException.java diff --git a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java index 811544d..935edf5 100644 --- a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java +++ b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java @@ -1,8 +1,14 @@ package com.eventsourcing.bankAccount.projection; +import com.eventsourcing.bankAccount.events.AddressUpdatedEvent; +import com.eventsourcing.bankAccount.events.BalanceDepositedEvent; +import com.eventsourcing.bankAccount.events.BankAccountCreatedEvent; +import com.eventsourcing.bankAccount.events.EmailChangedEvent; import com.eventsourcing.es.Event; +import com.eventsourcing.es.Projection; import com.eventsourcing.es.SerializerUtils; +import com.eventsourcing.exceptions.UnknownEventTypeException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -17,7 +23,7 @@ import java.util.Arrays; @Service @Slf4j @RequiredArgsConstructor -public class BankAccountMongoProjection { +public class BankAccountMongoProjection implements Projection { @Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}") private String bankAccountTopicName; @@ -32,6 +38,7 @@ public class BankAccountMongoProjection { try { final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data); + Arrays.stream(events).toList().forEach(this::when); ack.acknowledge(); log.info("ack events: {}", Arrays.toString(events)); } catch (Exception e) { @@ -40,4 +47,48 @@ public class BankAccountMongoProjection { log.error("bankAccountMongoProjectionListener: {}", e.getMessage()); } } + + @Override + public void when(Event event) { + final var aggregateId = event.getAggregateId(); + log.info("(when) >>>>> aggregateId: {}", aggregateId); + + switch (event.getEventType()) { + case BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1 -> + handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BankAccountCreatedEvent.class)); + case EmailChangedEvent.EMAIL_CHANGED_V1 -> + handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), EmailChangedEvent.class)); + case AddressUpdatedEvent.ADDRESS_UPDATED_V1 -> + handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class)); + case BalanceDepositedEvent.BALANCE_DEPOSITED -> + handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class)); + default -> throw new UnknownEventTypeException(event.getEventType()); + } + } + + + private void handle(BankAccountCreatedEvent event) { + log.info("(when) BankAccountCreatedEvent: {}, aggregateID: {}", event, event.getAggregateId()); + +// final var document = BankAccountDocument.builder() +// .aggregateId(event.getAggregateId()) +// .email(event.getEmail()) +// .address(event.getAddress()) +// .userName(event.getUserName()) +// .balance(BigDecimal.valueOf(0)) +// .build(); + } + + private void handle(EmailChangedEvent event) { + log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId()); + } + + private void handle(AddressUpdatedEvent event) { + log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId()); + } + + private void handle(BalanceDepositedEvent event) { + log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId()); + + } } diff --git a/src/main/java/com/eventsourcing/exceptions/UnknownEventTypeException.java b/src/main/java/com/eventsourcing/exceptions/UnknownEventTypeException.java new file mode 100644 index 0000000..d53dc55 --- /dev/null +++ b/src/main/java/com/eventsourcing/exceptions/UnknownEventTypeException.java @@ -0,0 +1,10 @@ +package com.eventsourcing.exceptions; + +public class UnknownEventTypeException extends RuntimeException { + public UnknownEventTypeException() { + } + + public UnknownEventTypeException(String eventType) { + super("unknown event type: " + eventType); + } +} From 5684cc094e85c834910438adfd77ae5fe50b12ec Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 14 Apr 2022 10:47:19 +0300 Subject: [PATCH 05/11] feature: add bank account request dto validation --- .../bankAccount/delivery/BankAccountController.java | 9 +++++---- .../bankAccount/dto/ChangeAddressRequestDTO.java | 5 ++++- .../bankAccount/dto/ChangeEmailRequestDTO.java | 6 +++++- .../bankAccount/dto/CreateBankAccountRequestDTO.java | 10 +++++++--- .../bankAccount/dto/DepositAmountRequestDTO.java | 4 +++- 5 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/eventsourcing/bankAccount/delivery/BankAccountController.java b/src/main/java/com/eventsourcing/bankAccount/delivery/BankAccountController.java index 152d4e6..8806a54 100644 --- a/src/main/java/com/eventsourcing/bankAccount/delivery/BankAccountController.java +++ b/src/main/java/com/eventsourcing/bankAccount/delivery/BankAccountController.java @@ -11,6 +11,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; +import javax.validation.Valid; import java.util.UUID; @RestController @@ -32,7 +33,7 @@ public class BankAccountController { } @PostMapping - public ResponseEntity createBankAccount(@RequestBody CreateBankAccountRequestDTO dto) { + public ResponseEntity createBankAccount(@Valid @RequestBody CreateBankAccountRequestDTO dto) { final var aggregateID = UUID.randomUUID().toString(); final var command = new CreateBankAccountCommand(aggregateID, dto.email(), dto.userName(), dto.address()); final var id = commandService.handle(command); @@ -41,7 +42,7 @@ public class BankAccountController { } @PostMapping(path = "/deposit/{aggregateId}") - public ResponseEntity depositAmount(@RequestBody DepositAmountRequestDTO dto, @PathVariable String aggregateId) { + public ResponseEntity depositAmount(@Valid @RequestBody DepositAmountRequestDTO dto, @PathVariable String aggregateId) { final var command = new DepositAmountCommand(aggregateId, dto.amount()); commandService.handle(command); log.info("DepositAmountCommand command: {}", command); @@ -49,7 +50,7 @@ public class BankAccountController { } @PostMapping(path = "/email/{aggregateId}") - public ResponseEntity changeEmail(@RequestBody ChangeEmailRequestDTO dto, @PathVariable String aggregateId) { + public ResponseEntity changeEmail(@Valid @RequestBody ChangeEmailRequestDTO dto, @PathVariable String aggregateId) { final var command = new ChangeEmailCommand(aggregateId, dto.newEmail()); commandService.handle(command); log.info("ChangeEmailCommand command: {}", command); @@ -57,7 +58,7 @@ public class BankAccountController { } @PostMapping(path = "/address/{aggregateId}") - public ResponseEntity changeAddress(@RequestBody ChangeAddressRequestDTO dto, @PathVariable String aggregateId) { + public ResponseEntity changeAddress(@Valid @RequestBody ChangeAddressRequestDTO dto, @PathVariable String aggregateId) { final var command = new ChangeAddressCommand(aggregateId, dto.newAddress()); commandService.handle(command); log.info("changeAddress command: {}", command); diff --git a/src/main/java/com/eventsourcing/bankAccount/dto/ChangeAddressRequestDTO.java b/src/main/java/com/eventsourcing/bankAccount/dto/ChangeAddressRequestDTO.java index d017574..49759c7 100644 --- a/src/main/java/com/eventsourcing/bankAccount/dto/ChangeAddressRequestDTO.java +++ b/src/main/java/com/eventsourcing/bankAccount/dto/ChangeAddressRequestDTO.java @@ -1,4 +1,7 @@ package com.eventsourcing.bankAccount.dto; -public record ChangeAddressRequestDTO(String newAddress) { +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.Size; + +public record ChangeAddressRequestDTO(@NotBlank @Size(min = 10, max = 250) String newAddress) { } diff --git a/src/main/java/com/eventsourcing/bankAccount/dto/ChangeEmailRequestDTO.java b/src/main/java/com/eventsourcing/bankAccount/dto/ChangeEmailRequestDTO.java index 4e2be91..bf54a58 100644 --- a/src/main/java/com/eventsourcing/bankAccount/dto/ChangeEmailRequestDTO.java +++ b/src/main/java/com/eventsourcing/bankAccount/dto/ChangeEmailRequestDTO.java @@ -1,4 +1,8 @@ package com.eventsourcing.bankAccount.dto; -public record ChangeEmailRequestDTO(String newEmail) { +import javax.validation.constraints.Email; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.Size; + +public record ChangeEmailRequestDTO(@Email @NotBlank @Size(min = 10, max = 250) String newEmail) { } diff --git a/src/main/java/com/eventsourcing/bankAccount/dto/CreateBankAccountRequestDTO.java b/src/main/java/com/eventsourcing/bankAccount/dto/CreateBankAccountRequestDTO.java index 097c08e..08fdcc7 100644 --- a/src/main/java/com/eventsourcing/bankAccount/dto/CreateBankAccountRequestDTO.java +++ b/src/main/java/com/eventsourcing/bankAccount/dto/CreateBankAccountRequestDTO.java @@ -1,7 +1,11 @@ package com.eventsourcing.bankAccount.dto; +import javax.validation.constraints.Email; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.Size; + public record CreateBankAccountRequestDTO( - String email, - String address, - String userName) { + @Email @NotBlank @Size(min = 10, max = 250) String email, + @NotBlank @Size(min = 10, max = 250) String address, + @NotBlank @Size(min = 10, max = 250) String userName) { } diff --git a/src/main/java/com/eventsourcing/bankAccount/dto/DepositAmountRequestDTO.java b/src/main/java/com/eventsourcing/bankAccount/dto/DepositAmountRequestDTO.java index 499d0b1..1fc73d6 100644 --- a/src/main/java/com/eventsourcing/bankAccount/dto/DepositAmountRequestDTO.java +++ b/src/main/java/com/eventsourcing/bankAccount/dto/DepositAmountRequestDTO.java @@ -1,6 +1,8 @@ package com.eventsourcing.bankAccount.dto; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; import java.math.BigDecimal; -public record DepositAmountRequestDTO(BigDecimal amount) { +public record DepositAmountRequestDTO(@Min(value = 300, message = "minimal amount is 300") @NotNull BigDecimal amount) { } \ No newline at end of file From 165cd1ebee714595f336197a9975ae3ebae236fa Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 14 Apr 2022 11:59:35 +0300 Subject: [PATCH 06/11] feature: add bank account mongo repository --- pom.xml | 4 ++ .../domain/BankAccountDocument.java | 36 ++++++++++++ .../BankAccountMongoProjection.java | 58 +++++++++++++++---- .../queries/BankAccountQueryHandler.java | 13 +++++ .../BankAccountMongoRepository.java | 11 ++++ .../configuration/MongoConfiguration.java | 27 +++++++++ .../mappers/BankAccountMapper.java | 39 +++++++------ src/main/resources/application.properties | 10 +++- 8 files changed, 168 insertions(+), 30 deletions(-) create mode 100644 src/main/java/com/eventsourcing/bankAccount/domain/BankAccountDocument.java create mode 100644 src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java create mode 100644 src/main/java/com/eventsourcing/configuration/MongoConfiguration.java diff --git a/pom.xml b/pom.xml index 81db9a2..b8da72a 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,10 @@ 17 + + org.springframework.boot + spring-boot-starter-data-mongodb + org.flywaydb flyway-core diff --git a/src/main/java/com/eventsourcing/bankAccount/domain/BankAccountDocument.java b/src/main/java/com/eventsourcing/bankAccount/domain/BankAccountDocument.java new file mode 100644 index 0000000..5ceed0a --- /dev/null +++ b/src/main/java/com/eventsourcing/bankAccount/domain/BankAccountDocument.java @@ -0,0 +1,36 @@ +package com.eventsourcing.bankAccount.domain; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.bson.codecs.pojo.annotations.BsonProperty; +import org.springframework.data.mongodb.core.mapping.Document; + +import java.math.BigDecimal; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +@Document(collection = "bankAccounts") +public class BankAccountDocument { + + @BsonProperty(value = "_id") + private String id; + + @BsonProperty(value = "aggregateId") + private String aggregateId; + + @BsonProperty(value = "email") + private String email; + + @BsonProperty(value = "userName") + private String userName; + + @BsonProperty(value = "address") + private String address; + + @BsonProperty(value = "balance") + private BigDecimal balance; +} diff --git a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java index 935edf5..fe887b1 100644 --- a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java +++ b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java @@ -1,14 +1,16 @@ package com.eventsourcing.bankAccount.projection; +import com.eventsourcing.bankAccount.domain.BankAccountDocument; import com.eventsourcing.bankAccount.events.AddressUpdatedEvent; import com.eventsourcing.bankAccount.events.BalanceDepositedEvent; import com.eventsourcing.bankAccount.events.BankAccountCreatedEvent; import com.eventsourcing.bankAccount.events.EmailChangedEvent; +import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository; import com.eventsourcing.es.Event; +import com.eventsourcing.es.EventStoreDB; import com.eventsourcing.es.Projection; import com.eventsourcing.es.SerializerUtils; -import com.eventsourcing.exceptions.UnknownEventTypeException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -18,13 +20,19 @@ import org.springframework.kafka.support.Acknowledgment; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; +import java.math.BigDecimal; import java.util.Arrays; +import java.util.List; +import java.util.Optional; @Service @Slf4j @RequiredArgsConstructor public class BankAccountMongoProjection implements Projection { + private final BankAccountMongoRepository mongoRepository; + private final EventStoreDB eventStoreDB; + @Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}") private String bankAccountTopicName; @@ -38,7 +46,7 @@ public class BankAccountMongoProjection implements Projection { try { final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data); - Arrays.stream(events).toList().forEach(this::when); + this.processEvents( Arrays.stream(events).toList()); ack.acknowledge(); log.info("ack events: {}", Arrays.toString(events)); } catch (Exception e) { @@ -48,6 +56,14 @@ public class BankAccountMongoProjection implements Projection { } } + private void processEvents(List events) { + try { + events.forEach(event -> this.when(event)); + } catch (Exception ex) { + // TODO: delete from mongo, get from eventStore, upsert to mongo + } + } + @Override public void when(Event event) { final var aggregateId = event.getAggregateId(); @@ -62,7 +78,7 @@ public class BankAccountMongoProjection implements Projection { handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class)); case BalanceDepositedEvent.BALANCE_DEPOSITED -> handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class)); - default -> throw new UnknownEventTypeException(event.getEventType()); + default -> log.error("unknown event type: {}", event.getEventType()); } } @@ -70,25 +86,47 @@ public class BankAccountMongoProjection implements Projection { private void handle(BankAccountCreatedEvent event) { log.info("(when) BankAccountCreatedEvent: {}, aggregateID: {}", event, event.getAggregateId()); -// final var document = BankAccountDocument.builder() -// .aggregateId(event.getAggregateId()) -// .email(event.getEmail()) -// .address(event.getAddress()) -// .userName(event.getUserName()) -// .balance(BigDecimal.valueOf(0)) -// .build(); + final var document = BankAccountDocument.builder() + .aggregateId(event.getAggregateId()) + .email(event.getEmail()) + .address(event.getAddress()) + .userName(event.getUserName()) + .balance(BigDecimal.valueOf(0)) + .build(); + + final var insert = mongoRepository.insert(document); + log.info("(BankAccountCreatedEvent) insert: {}", insert); } private void handle(EmailChangedEvent event) { log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId()); + Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); + if (documentOptional.isEmpty()) throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); + + final var document = documentOptional.get(); + document.setEmail(event.getNewEmail()); + mongoRepository.save(document); } private void handle(AddressUpdatedEvent event) { log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId()); + Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); + if (documentOptional.isEmpty()) throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); + + final var document = documentOptional.get(); + document.setAddress(event.getNewAddress()); + mongoRepository.save(document); } private void handle(BalanceDepositedEvent event) { log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId()); + Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); + if (documentOptional.isEmpty()) throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); + final var document = documentOptional.get(); + final var balance = document.getBalance(); + final var newBalance = balance.add(event.getAmount()); + document.setBalance(newBalance); + mongoRepository.save(document); } } diff --git a/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java b/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java index 76ddd6e..13ffbcc 100644 --- a/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java +++ b/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java @@ -1,13 +1,17 @@ package com.eventsourcing.bankAccount.queries; import com.eventsourcing.bankAccount.domain.BankAccountAggregate; +import com.eventsourcing.bankAccount.domain.BankAccountDocument; import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO; +import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository; import com.eventsourcing.es.EventStoreDB; import com.eventsourcing.mappers.BankAccountMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import java.util.Optional; + @Slf4j @RequiredArgsConstructor @@ -15,10 +19,19 @@ import org.springframework.stereotype.Service; public class BankAccountQueryHandler implements BankAccountQueryService { private final EventStoreDB eventStoreDB; + private final BankAccountMongoRepository mongoRepository; @Override public BankAccountResponseDTO handle(GetBankAccountByIDQuery query) { + Optional optionalDocument = mongoRepository.findByAggregateId(query.aggregateID()); + if (optionalDocument.isPresent()) { + return BankAccountMapper.bankAccountResponseDTOFromDocument(optionalDocument.get()); + } + final var aggregate = eventStoreDB.load(query.aggregateID(), BankAccountAggregate.class); + BankAccountDocument savedDocument = mongoRepository.save(BankAccountMapper.bankAccountDocumentFromAggregate(aggregate)); + log.info("(GetBankAccountByIDQuery) savedDocument: {}", savedDocument); + final var bankAccountResponseDTO = BankAccountMapper.bankAccountResponseDTOFromAggregate(aggregate); log.info("(GetBankAccountByIDQuery) response: {}", bankAccountResponseDTO); return bankAccountResponseDTO; diff --git a/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java b/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java new file mode 100644 index 0000000..68740dc --- /dev/null +++ b/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java @@ -0,0 +1,11 @@ +package com.eventsourcing.bankAccount.repository; + +import com.eventsourcing.bankAccount.domain.BankAccountDocument; +import org.springframework.data.mongodb.repository.MongoRepository; + +import java.util.Optional; + +public interface BankAccountMongoRepository extends MongoRepository { + + Optional findByAggregateId(String aggregateId); +} diff --git a/src/main/java/com/eventsourcing/configuration/MongoConfiguration.java b/src/main/java/com/eventsourcing/configuration/MongoConfiguration.java new file mode 100644 index 0000000..2577143 --- /dev/null +++ b/src/main/java/com/eventsourcing/configuration/MongoConfiguration.java @@ -0,0 +1,27 @@ +package com.eventsourcing.configuration; + +import com.eventsourcing.bankAccount.domain.BankAccountDocument; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.domain.Sort; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.index.Index; + +import javax.annotation.PostConstruct; + +@Configuration +@Slf4j +@RequiredArgsConstructor +public class MongoConfiguration { + + private final MongoTemplate mongoTemplate; + + @PostConstruct + public void mongoInit() { + final var bankAccounts = mongoTemplate.getCollection("bankAccounts"); + final var aggregateIdIndex = mongoTemplate.indexOps(BankAccountDocument.class).ensureIndex(new Index("aggregateId", Sort.Direction.ASC).unique()); + final var indexInfo = mongoTemplate.indexOps(BankAccountDocument.class).getIndexInfo(); + log.info("MongoDB connected, bankAccounts aggregateId index created: {}", indexInfo); + } +} diff --git a/src/main/java/com/eventsourcing/mappers/BankAccountMapper.java b/src/main/java/com/eventsourcing/mappers/BankAccountMapper.java index 55c5eb9..507ab9d 100644 --- a/src/main/java/com/eventsourcing/mappers/BankAccountMapper.java +++ b/src/main/java/com/eventsourcing/mappers/BankAccountMapper.java @@ -1,6 +1,7 @@ package com.eventsourcing.mappers; import com.eventsourcing.bankAccount.domain.BankAccountAggregate; +import com.eventsourcing.bankAccount.domain.BankAccountDocument; import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO; public final class BankAccountMapper { @@ -19,23 +20,23 @@ public final class BankAccountMapper { ); } -// public static BankAccountResponseDTO bankAccountResponseDTOFromDocument(BankAccountDocument bankAccountDocument) { -// return new BankAccountResponseDTO( -// bankAccountDocument.getAggregateId(), -// bankAccountDocument.getEmail(), -// bankAccountDocument.getAddress(), -// bankAccountDocument.getUserName(), -// bankAccountDocument.getBalance() -// ); -// } -// -// public static BankAccountDocument bankAccountDocumentFromAggregate(BankAccountAggregate bankAccountAggregate) { -// return BankAccountDocument.builder() -// .aggregateId(bankAccountAggregate.getId()) -// .email(bankAccountAggregate.getEmail()) -// .address(bankAccountAggregate.getAddress()) -// .userName(bankAccountAggregate.getUserName()) -// .balance(bankAccountAggregate.getBalance()) -// .build(); -// } + public static BankAccountResponseDTO bankAccountResponseDTOFromDocument(BankAccountDocument bankAccountDocument) { + return new BankAccountResponseDTO( + bankAccountDocument.getAggregateId(), + bankAccountDocument.getEmail(), + bankAccountDocument.getAddress(), + bankAccountDocument.getUserName(), + bankAccountDocument.getBalance() + ); + } + + public static BankAccountDocument bankAccountDocumentFromAggregate(BankAccountAggregate bankAccountAggregate) { + return BankAccountDocument.builder() + .aggregateId(bankAccountAggregate.getId()) + .email(bankAccountAggregate.getEmail()) + .address(bankAccountAggregate.getAddress()) + .userName(bankAccountAggregate.getUserName()) + .balance(bankAccountAggregate.getBalance()) + .build(); + } } \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index e2f4287..8741636 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -27,4 +27,12 @@ logging.level.org.apache.kafka=warn microservice.kafka.topics.bank-account-event-store=bank-account-event-store microservice.kafka.groupId=es_microservice -microservice.kafka.default-concurrency=10 \ No newline at end of file +microservice.kafka.default-concurrency=10 + + +spring.data.mongodb.host=localhost +spring.data.mongodb.port=27017 +spring.data.mongodb.authentication-database=admin +spring.data.mongodb.username=admin +spring.data.mongodb.password=admin +spring.data.mongodb.database=microservices From c513bbfc3988511d9dd3d84f09444853ec2536fa Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 14 Apr 2022 12:08:44 +0300 Subject: [PATCH 07/11] feature: add bank account mongo projection improvements --- .../BankAccountMongoProjection.java | 20 ++++++++++++++----- .../BankAccountMongoRepository.java | 2 ++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java index fe887b1..f01015f 100644 --- a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java +++ b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java @@ -1,6 +1,7 @@ package com.eventsourcing.bankAccount.projection; +import com.eventsourcing.bankAccount.domain.BankAccountAggregate; import com.eventsourcing.bankAccount.domain.BankAccountDocument; import com.eventsourcing.bankAccount.events.AddressUpdatedEvent; import com.eventsourcing.bankAccount.events.BalanceDepositedEvent; @@ -11,6 +12,7 @@ import com.eventsourcing.es.Event; import com.eventsourcing.es.EventStoreDB; import com.eventsourcing.es.Projection; import com.eventsourcing.es.SerializerUtils; +import com.eventsourcing.mappers.BankAccountMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -46,7 +48,7 @@ public class BankAccountMongoProjection implements Projection { try { final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data); - this.processEvents( Arrays.stream(events).toList()); + this.processEvents(Arrays.stream(events).toList()); ack.acknowledge(); log.info("ack events: {}", Arrays.toString(events)); } catch (Exception e) { @@ -58,9 +60,14 @@ public class BankAccountMongoProjection implements Projection { private void processEvents(List events) { try { - events.forEach(event -> this.when(event)); + events.forEach(this::when); } catch (Exception ex) { // TODO: delete from mongo, get from eventStore, upsert to mongo + mongoRepository.deleteByAggregateId(events.get(0).getAggregateId()); + final var aggregate = eventStoreDB.load(events.get(0).getAggregateId(), BankAccountAggregate.class); + final var document = BankAccountMapper.bankAccountDocumentFromAggregate(aggregate); + BankAccountDocument result = mongoRepository.save(document); + log.info("(processEvents) saved document: {}", result); } } @@ -101,7 +108,8 @@ public class BankAccountMongoProjection implements Projection { private void handle(EmailChangedEvent event) { log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId()); Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); - if (documentOptional.isEmpty()) throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); + if (documentOptional.isEmpty()) + throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); final var document = documentOptional.get(); document.setEmail(event.getNewEmail()); @@ -111,7 +119,8 @@ public class BankAccountMongoProjection implements Projection { private void handle(AddressUpdatedEvent event) { log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId()); Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); - if (documentOptional.isEmpty()) throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); + if (documentOptional.isEmpty()) + throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); final var document = documentOptional.get(); document.setAddress(event.getNewAddress()); @@ -121,7 +130,8 @@ public class BankAccountMongoProjection implements Projection { private void handle(BalanceDepositedEvent event) { log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId()); Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); - if (documentOptional.isEmpty()) throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); + if (documentOptional.isEmpty()) + throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); final var document = documentOptional.get(); final var balance = document.getBalance(); diff --git a/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java b/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java index 68740dc..5fa5e4e 100644 --- a/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java +++ b/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java @@ -8,4 +8,6 @@ import java.util.Optional; public interface BankAccountMongoRepository extends MongoRepository { Optional findByAggregateId(String aggregateId); + + void deleteByAggregateId(String aggregateId); } From 920ce9149622268b5895255c188ab76b66ca7875 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 14 Apr 2022 14:42:24 +0300 Subject: [PATCH 08/11] feature: add bank account mongo projection improvements update --- .../projection/BankAccountMongoProjection.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java index f01015f..6ed2f34 100644 --- a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java +++ b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java @@ -15,7 +15,6 @@ import com.eventsourcing.es.SerializerUtils; import com.eventsourcing.mappers.BankAccountMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata; import org.springframework.kafka.support.Acknowledgment; @@ -35,9 +34,6 @@ public class BankAccountMongoProjection implements Projection { private final BankAccountMongoRepository mongoRepository; private final EventStoreDB eventStoreDB; - @Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}") - private String bankAccountTopicName; - @KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"}, groupId = "${microservice.kafka.groupId}", @@ -59,14 +55,15 @@ public class BankAccountMongoProjection implements Projection { } private void processEvents(List events) { + if (events.isEmpty()) return; + try { events.forEach(this::when); } catch (Exception ex) { - // TODO: delete from mongo, get from eventStore, upsert to mongo mongoRepository.deleteByAggregateId(events.get(0).getAggregateId()); final var aggregate = eventStoreDB.load(events.get(0).getAggregateId(), BankAccountAggregate.class); final var document = BankAccountMapper.bankAccountDocumentFromAggregate(aggregate); - BankAccountDocument result = mongoRepository.save(document); + final var result = mongoRepository.save(document); log.info("(processEvents) saved document: {}", result); } } From 747d6da0b3729d2bfc861e01b253550532564f16 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 14 Apr 2022 11:59:35 +0300 Subject: [PATCH 09/11] feature: add bank account mongo repository --- pom.xml | 4 ++ .../domain/BankAccountDocument.java | 36 ++++++++++++ .../BankAccountMongoProjection.java | 58 +++++++++++++++---- .../queries/BankAccountQueryHandler.java | 13 +++++ .../BankAccountMongoRepository.java | 11 ++++ .../configuration/MongoConfiguration.java | 27 +++++++++ .../mappers/BankAccountMapper.java | 39 +++++++------ src/main/resources/application.properties | 10 +++- 8 files changed, 168 insertions(+), 30 deletions(-) create mode 100644 src/main/java/com/eventsourcing/bankAccount/domain/BankAccountDocument.java create mode 100644 src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java create mode 100644 src/main/java/com/eventsourcing/configuration/MongoConfiguration.java diff --git a/pom.xml b/pom.xml index 81db9a2..b8da72a 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,10 @@ 17 + + org.springframework.boot + spring-boot-starter-data-mongodb + org.flywaydb flyway-core diff --git a/src/main/java/com/eventsourcing/bankAccount/domain/BankAccountDocument.java b/src/main/java/com/eventsourcing/bankAccount/domain/BankAccountDocument.java new file mode 100644 index 0000000..5ceed0a --- /dev/null +++ b/src/main/java/com/eventsourcing/bankAccount/domain/BankAccountDocument.java @@ -0,0 +1,36 @@ +package com.eventsourcing.bankAccount.domain; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.bson.codecs.pojo.annotations.BsonProperty; +import org.springframework.data.mongodb.core.mapping.Document; + +import java.math.BigDecimal; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +@Document(collection = "bankAccounts") +public class BankAccountDocument { + + @BsonProperty(value = "_id") + private String id; + + @BsonProperty(value = "aggregateId") + private String aggregateId; + + @BsonProperty(value = "email") + private String email; + + @BsonProperty(value = "userName") + private String userName; + + @BsonProperty(value = "address") + private String address; + + @BsonProperty(value = "balance") + private BigDecimal balance; +} diff --git a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java index 935edf5..fe887b1 100644 --- a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java +++ b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java @@ -1,14 +1,16 @@ package com.eventsourcing.bankAccount.projection; +import com.eventsourcing.bankAccount.domain.BankAccountDocument; import com.eventsourcing.bankAccount.events.AddressUpdatedEvent; import com.eventsourcing.bankAccount.events.BalanceDepositedEvent; import com.eventsourcing.bankAccount.events.BankAccountCreatedEvent; import com.eventsourcing.bankAccount.events.EmailChangedEvent; +import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository; import com.eventsourcing.es.Event; +import com.eventsourcing.es.EventStoreDB; import com.eventsourcing.es.Projection; import com.eventsourcing.es.SerializerUtils; -import com.eventsourcing.exceptions.UnknownEventTypeException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -18,13 +20,19 @@ import org.springframework.kafka.support.Acknowledgment; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; +import java.math.BigDecimal; import java.util.Arrays; +import java.util.List; +import java.util.Optional; @Service @Slf4j @RequiredArgsConstructor public class BankAccountMongoProjection implements Projection { + private final BankAccountMongoRepository mongoRepository; + private final EventStoreDB eventStoreDB; + @Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}") private String bankAccountTopicName; @@ -38,7 +46,7 @@ public class BankAccountMongoProjection implements Projection { try { final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data); - Arrays.stream(events).toList().forEach(this::when); + this.processEvents( Arrays.stream(events).toList()); ack.acknowledge(); log.info("ack events: {}", Arrays.toString(events)); } catch (Exception e) { @@ -48,6 +56,14 @@ public class BankAccountMongoProjection implements Projection { } } + private void processEvents(List events) { + try { + events.forEach(event -> this.when(event)); + } catch (Exception ex) { + // TODO: delete from mongo, get from eventStore, upsert to mongo + } + } + @Override public void when(Event event) { final var aggregateId = event.getAggregateId(); @@ -62,7 +78,7 @@ public class BankAccountMongoProjection implements Projection { handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class)); case BalanceDepositedEvent.BALANCE_DEPOSITED -> handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class)); - default -> throw new UnknownEventTypeException(event.getEventType()); + default -> log.error("unknown event type: {}", event.getEventType()); } } @@ -70,25 +86,47 @@ public class BankAccountMongoProjection implements Projection { private void handle(BankAccountCreatedEvent event) { log.info("(when) BankAccountCreatedEvent: {}, aggregateID: {}", event, event.getAggregateId()); -// final var document = BankAccountDocument.builder() -// .aggregateId(event.getAggregateId()) -// .email(event.getEmail()) -// .address(event.getAddress()) -// .userName(event.getUserName()) -// .balance(BigDecimal.valueOf(0)) -// .build(); + final var document = BankAccountDocument.builder() + .aggregateId(event.getAggregateId()) + .email(event.getEmail()) + .address(event.getAddress()) + .userName(event.getUserName()) + .balance(BigDecimal.valueOf(0)) + .build(); + + final var insert = mongoRepository.insert(document); + log.info("(BankAccountCreatedEvent) insert: {}", insert); } private void handle(EmailChangedEvent event) { log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId()); + Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); + if (documentOptional.isEmpty()) throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); + + final var document = documentOptional.get(); + document.setEmail(event.getNewEmail()); + mongoRepository.save(document); } private void handle(AddressUpdatedEvent event) { log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId()); + Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); + if (documentOptional.isEmpty()) throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); + + final var document = documentOptional.get(); + document.setAddress(event.getNewAddress()); + mongoRepository.save(document); } private void handle(BalanceDepositedEvent event) { log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId()); + Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); + if (documentOptional.isEmpty()) throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); + final var document = documentOptional.get(); + final var balance = document.getBalance(); + final var newBalance = balance.add(event.getAmount()); + document.setBalance(newBalance); + mongoRepository.save(document); } } diff --git a/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java b/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java index 76ddd6e..13ffbcc 100644 --- a/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java +++ b/src/main/java/com/eventsourcing/bankAccount/queries/BankAccountQueryHandler.java @@ -1,13 +1,17 @@ package com.eventsourcing.bankAccount.queries; import com.eventsourcing.bankAccount.domain.BankAccountAggregate; +import com.eventsourcing.bankAccount.domain.BankAccountDocument; import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO; +import com.eventsourcing.bankAccount.repository.BankAccountMongoRepository; import com.eventsourcing.es.EventStoreDB; import com.eventsourcing.mappers.BankAccountMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import java.util.Optional; + @Slf4j @RequiredArgsConstructor @@ -15,10 +19,19 @@ import org.springframework.stereotype.Service; public class BankAccountQueryHandler implements BankAccountQueryService { private final EventStoreDB eventStoreDB; + private final BankAccountMongoRepository mongoRepository; @Override public BankAccountResponseDTO handle(GetBankAccountByIDQuery query) { + Optional optionalDocument = mongoRepository.findByAggregateId(query.aggregateID()); + if (optionalDocument.isPresent()) { + return BankAccountMapper.bankAccountResponseDTOFromDocument(optionalDocument.get()); + } + final var aggregate = eventStoreDB.load(query.aggregateID(), BankAccountAggregate.class); + BankAccountDocument savedDocument = mongoRepository.save(BankAccountMapper.bankAccountDocumentFromAggregate(aggregate)); + log.info("(GetBankAccountByIDQuery) savedDocument: {}", savedDocument); + final var bankAccountResponseDTO = BankAccountMapper.bankAccountResponseDTOFromAggregate(aggregate); log.info("(GetBankAccountByIDQuery) response: {}", bankAccountResponseDTO); return bankAccountResponseDTO; diff --git a/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java b/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java new file mode 100644 index 0000000..68740dc --- /dev/null +++ b/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java @@ -0,0 +1,11 @@ +package com.eventsourcing.bankAccount.repository; + +import com.eventsourcing.bankAccount.domain.BankAccountDocument; +import org.springframework.data.mongodb.repository.MongoRepository; + +import java.util.Optional; + +public interface BankAccountMongoRepository extends MongoRepository { + + Optional findByAggregateId(String aggregateId); +} diff --git a/src/main/java/com/eventsourcing/configuration/MongoConfiguration.java b/src/main/java/com/eventsourcing/configuration/MongoConfiguration.java new file mode 100644 index 0000000..2577143 --- /dev/null +++ b/src/main/java/com/eventsourcing/configuration/MongoConfiguration.java @@ -0,0 +1,27 @@ +package com.eventsourcing.configuration; + +import com.eventsourcing.bankAccount.domain.BankAccountDocument; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.domain.Sort; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.index.Index; + +import javax.annotation.PostConstruct; + +@Configuration +@Slf4j +@RequiredArgsConstructor +public class MongoConfiguration { + + private final MongoTemplate mongoTemplate; + + @PostConstruct + public void mongoInit() { + final var bankAccounts = mongoTemplate.getCollection("bankAccounts"); + final var aggregateIdIndex = mongoTemplate.indexOps(BankAccountDocument.class).ensureIndex(new Index("aggregateId", Sort.Direction.ASC).unique()); + final var indexInfo = mongoTemplate.indexOps(BankAccountDocument.class).getIndexInfo(); + log.info("MongoDB connected, bankAccounts aggregateId index created: {}", indexInfo); + } +} diff --git a/src/main/java/com/eventsourcing/mappers/BankAccountMapper.java b/src/main/java/com/eventsourcing/mappers/BankAccountMapper.java index 55c5eb9..507ab9d 100644 --- a/src/main/java/com/eventsourcing/mappers/BankAccountMapper.java +++ b/src/main/java/com/eventsourcing/mappers/BankAccountMapper.java @@ -1,6 +1,7 @@ package com.eventsourcing.mappers; import com.eventsourcing.bankAccount.domain.BankAccountAggregate; +import com.eventsourcing.bankAccount.domain.BankAccountDocument; import com.eventsourcing.bankAccount.dto.BankAccountResponseDTO; public final class BankAccountMapper { @@ -19,23 +20,23 @@ public final class BankAccountMapper { ); } -// public static BankAccountResponseDTO bankAccountResponseDTOFromDocument(BankAccountDocument bankAccountDocument) { -// return new BankAccountResponseDTO( -// bankAccountDocument.getAggregateId(), -// bankAccountDocument.getEmail(), -// bankAccountDocument.getAddress(), -// bankAccountDocument.getUserName(), -// bankAccountDocument.getBalance() -// ); -// } -// -// public static BankAccountDocument bankAccountDocumentFromAggregate(BankAccountAggregate bankAccountAggregate) { -// return BankAccountDocument.builder() -// .aggregateId(bankAccountAggregate.getId()) -// .email(bankAccountAggregate.getEmail()) -// .address(bankAccountAggregate.getAddress()) -// .userName(bankAccountAggregate.getUserName()) -// .balance(bankAccountAggregate.getBalance()) -// .build(); -// } + public static BankAccountResponseDTO bankAccountResponseDTOFromDocument(BankAccountDocument bankAccountDocument) { + return new BankAccountResponseDTO( + bankAccountDocument.getAggregateId(), + bankAccountDocument.getEmail(), + bankAccountDocument.getAddress(), + bankAccountDocument.getUserName(), + bankAccountDocument.getBalance() + ); + } + + public static BankAccountDocument bankAccountDocumentFromAggregate(BankAccountAggregate bankAccountAggregate) { + return BankAccountDocument.builder() + .aggregateId(bankAccountAggregate.getId()) + .email(bankAccountAggregate.getEmail()) + .address(bankAccountAggregate.getAddress()) + .userName(bankAccountAggregate.getUserName()) + .balance(bankAccountAggregate.getBalance()) + .build(); + } } \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index e2f4287..8741636 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -27,4 +27,12 @@ logging.level.org.apache.kafka=warn microservice.kafka.topics.bank-account-event-store=bank-account-event-store microservice.kafka.groupId=es_microservice -microservice.kafka.default-concurrency=10 \ No newline at end of file +microservice.kafka.default-concurrency=10 + + +spring.data.mongodb.host=localhost +spring.data.mongodb.port=27017 +spring.data.mongodb.authentication-database=admin +spring.data.mongodb.username=admin +spring.data.mongodb.password=admin +spring.data.mongodb.database=microservices From 3f6d87244493c91fbea129feb68bd173df16e79e Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 14 Apr 2022 12:08:44 +0300 Subject: [PATCH 10/11] feature: add bank account mongo projection improvements --- .../BankAccountMongoProjection.java | 20 ++++++++++++++----- .../BankAccountMongoRepository.java | 2 ++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java index fe887b1..f01015f 100644 --- a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java +++ b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java @@ -1,6 +1,7 @@ package com.eventsourcing.bankAccount.projection; +import com.eventsourcing.bankAccount.domain.BankAccountAggregate; import com.eventsourcing.bankAccount.domain.BankAccountDocument; import com.eventsourcing.bankAccount.events.AddressUpdatedEvent; import com.eventsourcing.bankAccount.events.BalanceDepositedEvent; @@ -11,6 +12,7 @@ import com.eventsourcing.es.Event; import com.eventsourcing.es.EventStoreDB; import com.eventsourcing.es.Projection; import com.eventsourcing.es.SerializerUtils; +import com.eventsourcing.mappers.BankAccountMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -46,7 +48,7 @@ public class BankAccountMongoProjection implements Projection { try { final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data); - this.processEvents( Arrays.stream(events).toList()); + this.processEvents(Arrays.stream(events).toList()); ack.acknowledge(); log.info("ack events: {}", Arrays.toString(events)); } catch (Exception e) { @@ -58,9 +60,14 @@ public class BankAccountMongoProjection implements Projection { private void processEvents(List events) { try { - events.forEach(event -> this.when(event)); + events.forEach(this::when); } catch (Exception ex) { // TODO: delete from mongo, get from eventStore, upsert to mongo + mongoRepository.deleteByAggregateId(events.get(0).getAggregateId()); + final var aggregate = eventStoreDB.load(events.get(0).getAggregateId(), BankAccountAggregate.class); + final var document = BankAccountMapper.bankAccountDocumentFromAggregate(aggregate); + BankAccountDocument result = mongoRepository.save(document); + log.info("(processEvents) saved document: {}", result); } } @@ -101,7 +108,8 @@ public class BankAccountMongoProjection implements Projection { private void handle(EmailChangedEvent event) { log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId()); Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); - if (documentOptional.isEmpty()) throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); + if (documentOptional.isEmpty()) + throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); final var document = documentOptional.get(); document.setEmail(event.getNewEmail()); @@ -111,7 +119,8 @@ public class BankAccountMongoProjection implements Projection { private void handle(AddressUpdatedEvent event) { log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId()); Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); - if (documentOptional.isEmpty()) throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); + if (documentOptional.isEmpty()) + throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); final var document = documentOptional.get(); document.setAddress(event.getNewAddress()); @@ -121,7 +130,8 @@ public class BankAccountMongoProjection implements Projection { private void handle(BalanceDepositedEvent event) { log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId()); Optional documentOptional = mongoRepository.findByAggregateId(event.getAggregateId()); - if (documentOptional.isEmpty()) throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); + if (documentOptional.isEmpty()) + throw new RuntimeException("Bank Account Document not found id: {}" + event.getAggregateId()); final var document = documentOptional.get(); final var balance = document.getBalance(); diff --git a/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java b/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java index 68740dc..5fa5e4e 100644 --- a/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java +++ b/src/main/java/com/eventsourcing/bankAccount/repository/BankAccountMongoRepository.java @@ -8,4 +8,6 @@ import java.util.Optional; public interface BankAccountMongoRepository extends MongoRepository { Optional findByAggregateId(String aggregateId); + + void deleteByAggregateId(String aggregateId); } From 3e642c079c26ae1a8d1123f6d9c6f59a3ea52e44 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 14 Apr 2022 14:42:24 +0300 Subject: [PATCH 11/11] feature: add bank account mongo projection improvements update --- .../projection/BankAccountMongoProjection.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java index f01015f..6ed2f34 100644 --- a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java +++ b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java @@ -15,7 +15,6 @@ import com.eventsourcing.es.SerializerUtils; import com.eventsourcing.mappers.BankAccountMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata; import org.springframework.kafka.support.Acknowledgment; @@ -35,9 +34,6 @@ public class BankAccountMongoProjection implements Projection { private final BankAccountMongoRepository mongoRepository; private final EventStoreDB eventStoreDB; - @Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}") - private String bankAccountTopicName; - @KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"}, groupId = "${microservice.kafka.groupId}", @@ -59,14 +55,15 @@ public class BankAccountMongoProjection implements Projection { } private void processEvents(List events) { + if (events.isEmpty()) return; + try { events.forEach(this::when); } catch (Exception ex) { - // TODO: delete from mongo, get from eventStore, upsert to mongo mongoRepository.deleteByAggregateId(events.get(0).getAggregateId()); final var aggregate = eventStoreDB.load(events.get(0).getAggregateId(), BankAccountAggregate.class); final var document = BankAccountMapper.bankAccountDocumentFromAggregate(aggregate); - BankAccountDocument result = mongoRepository.save(document); + final var result = mongoRepository.save(document); log.info("(processEvents) saved document: {}", result); } }