From 15ca094c0112791ecc0841d855d117d065ed3b80 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 14 Apr 2022 09:56:46 +0300 Subject: [PATCH] feature: add kafka event bus --- pom.xml | 4 ++ .../BankAccountMongoProjection.java | 41 ++++++++++++++++++ .../KafkaTopicConfiguration.java | 2 +- .../java/com/eventsourcing/es/EventStore.java | 5 +++ .../com/eventsourcing/es/KafkaEventBus.java | 37 ++++++++++++++++ src/main/resources/application.properties | 6 +-- .../db/migration/V1__initial_setup.sql | 43 +++++++++++++++++++ 7 files changed, 134 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java create mode 100644 src/main/java/com/eventsourcing/es/KafkaEventBus.java create mode 100644 src/main/resources/db/migration/V1__initial_setup.sql diff --git a/pom.xml b/pom.xml index ed323ee..81db9a2 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,10 @@ 17 + + org.flywaydb + flyway-core + org.springframework.boot spring-boot-starter-validation diff --git a/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java new file mode 100644 index 0000000..98d9f06 --- /dev/null +++ b/src/main/java/com/eventsourcing/bankAccount/projection/BankAccountMongoProjection.java @@ -0,0 +1,41 @@ +package com.eventsourcing.bankAccount.projection; + + +import com.eventsourcing.es.Event; +import com.eventsourcing.es.SerializerUtils; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Service; + +import java.util.Arrays; + +@Service +@Slf4j +@RequiredArgsConstructor +public class BankAccountMongoProjection { + + @Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}") + private String bankAccountTopicName; + + + @KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"}, + groupId = "${microservice.kafka.groupId}", + concurrency = "${microservice.kafka.default-concurrency}") + public void bankAccountMongoProjectionListener(@Payload byte[] data, ConsumerRecordMetadata meta, Acknowledgment ack) { + log.info("(BankAccountMongoProjection) data: {}", new String(data)); + + try { + final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data); + ack.acknowledge(); + log.info("ack events: {}", Arrays.toString(events)); + } catch (Exception e) { + ack.nack(100); + log.error("bankAccountMongoProjectionListener: {}", e.getMessage()); + } + } +} diff --git a/src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java b/src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java index 84d98ab..6f33442 100644 --- a/src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java +++ b/src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java @@ -21,7 +21,7 @@ public class KafkaTopicConfiguration { @Value(value = "${kafka.bootstrapServers:localhost:9093}") private String bootstrapServers; - @Value(value = "${order.kafka.topics.bank-account-event-store:bank-account-event-store}") + @Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}") private String bankAccountTopicName; @Bean diff --git a/src/main/java/com/eventsourcing/es/EventStore.java b/src/main/java/com/eventsourcing/es/EventStore.java index b7c5054..a1f3f2c 100644 --- a/src/main/java/com/eventsourcing/es/EventStore.java +++ b/src/main/java/com/eventsourcing/es/EventStore.java @@ -25,6 +25,7 @@ public class EventStore implements EventStoreDB { private static final String EXISTS_QUERY = "SELECT aggregate_id FROM events WHERE e e.aggregate_id = :aggregate_id"; private final NamedParameterJdbcTemplate jdbcTemplate; + private final EventBus eventBus; @Override public void saveEvents(List events) { @@ -78,6 +79,8 @@ public class EventStore implements EventStoreDB { @Override @Transactional public void save(T aggregate) { + final List aggregateEvents = new ArrayList<>(aggregate.getChanges()); + if (aggregate.getVersion() > 1) { this.handleConcurrency(aggregate.getId()); } @@ -87,6 +90,8 @@ public class EventStore implements EventStoreDB { this.saveSnapshot(aggregate); } + eventBus.publish(aggregateEvents); + log.info("(save) saved aggregate: {}", aggregate); } diff --git a/src/main/java/com/eventsourcing/es/KafkaEventBus.java b/src/main/java/com/eventsourcing/es/KafkaEventBus.java new file mode 100644 index 0000000..68b0ce4 --- /dev/null +++ b/src/main/java/com/eventsourcing/es/KafkaEventBus.java @@ -0,0 +1,37 @@ +package com.eventsourcing.es; + + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +@Service +@Slf4j +@RequiredArgsConstructor +public class KafkaEventBus implements EventBus { + private final KafkaTemplate kafkaTemplate; + + @Value(value = "${order.kafka.topics.bank-account-event-store:bank-account-event-store}") + private String bankAccountTopicName; + + @Override + public void publish(List events) { + final byte[] eventsBytes = SerializerUtils.serializeToJsonBytes(events.toArray(new Event[]{})); + final ProducerRecord record = new ProducerRecord<>(bankAccountTopicName, eventsBytes); + + try { + kafkaTemplate.send(record).get(3000, TimeUnit.MILLISECONDS); + log.info("publishing kafka record value >>>>> {}", new String(record.value())); + + } catch (Exception ex) { + log.error("(KafkaEventBus) publish get", ex); + throw new RuntimeException(ex); + } + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 60a3792..e2f4287 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -25,6 +25,6 @@ spring.kafka.consumer.group-id=microservice logging.level.org.apache.kafka=warn -order.kafka.topics.bank-account-event-store=bank-account-event-store -order.kafka.groupId=es_microservice -order.kafka.default-concurrency=10 \ No newline at end of file +microservice.kafka.topics.bank-account-event-store=bank-account-event-store +microservice.kafka.groupId=es_microservice +microservice.kafka.default-concurrency=10 \ No newline at end of file diff --git a/src/main/resources/db/migration/V1__initial_setup.sql b/src/main/resources/db/migration/V1__initial_setup.sql new file mode 100644 index 0000000..e21ddfb --- /dev/null +++ b/src/main/resources/db/migration/V1__initial_setup.sql @@ -0,0 +1,43 @@ +CREATE EXTENSION IF NOT EXISTS citext; +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; +DROP TABLE IF EXISTS events CASCADE; +DROP TABLE IF EXISTS snapshots CASCADE; + + +CREATE TABLE IF NOT EXISTS events +( + event_id UUID DEFAULT uuid_generate_v4(), + aggregate_id VARCHAR(250) NOT NULL CHECK ( aggregate_id <> '' ), + aggregate_type VARCHAR(250) NOT NULL CHECK ( aggregate_type <> '' ), + event_type VARCHAR(250) NOT NULL CHECK ( event_type <> '' ), + data BYTEA, + metadata BYTEA, + version SERIAL NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + UNIQUE (aggregate_id, version) +) PARTITION BY HASH (aggregate_id); + +CREATE INDEX IF NOT EXISTS aggregate_id_aggregate_version_idx ON events USING btree (aggregate_id, version ASC); + +CREATE TABLE IF NOT EXISTS events_partition_hash_1 PARTITION OF events + FOR VALUES WITH (MODULUS 3, REMAINDER 0); + +CREATE TABLE IF NOT EXISTS events_partition_hash_2 PARTITION OF events + FOR VALUES WITH (MODULUS 3, REMAINDER 1); + +CREATE TABLE IF NOT EXISTS events_partition_hash_3 PARTITION OF events + FOR VALUES WITH (MODULUS 3, REMAINDER 2); + +CREATE TABLE IF NOT EXISTS snapshots +( + snapshot_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + aggregate_id VARCHAR(250) UNIQUE NOT NULL CHECK ( aggregate_id <> '' ), + aggregate_type VARCHAR(250) NOT NULL CHECK ( aggregate_type <> '' ), + data BYTEA, + metadata BYTEA, + version SERIAL NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + UNIQUE (aggregate_id) +); + +CREATE INDEX IF NOT EXISTS aggregate_id_aggregate_version_idx ON snapshots USING btree (aggregate_id, version); \ No newline at end of file