feature: add kafka event bus

This commit is contained in:
Alexander
2022-04-14 09:56:46 +03:00
parent 35b7348234
commit 0e1f096542
7 changed files with 134 additions and 4 deletions

View File

@@ -17,6 +17,10 @@
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>

View File

@@ -0,0 +1,41 @@
package com.eventsourcing.bankAccount.projection;
import com.eventsourcing.es.Event;
import com.eventsourcing.es.SerializerUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.util.Arrays;
@Service
@Slf4j
@RequiredArgsConstructor
public class BankAccountMongoProjection {
@Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}")
private String bankAccountTopicName;
@KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"},
groupId = "${microservice.kafka.groupId}",
concurrency = "${microservice.kafka.default-concurrency}")
public void bankAccountMongoProjectionListener(@Payload byte[] data, ConsumerRecordMetadata meta, Acknowledgment ack) {
log.info("(BankAccountMongoProjection) data: {}", new String(data));
try {
final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data);
ack.acknowledge();
log.info("ack events: {}", Arrays.toString(events));
} catch (Exception e) {
ack.nack(100);
log.error("bankAccountMongoProjectionListener: {}", e.getMessage());
}
}
}

View File

@@ -21,7 +21,7 @@ public class KafkaTopicConfiguration {
@Value(value = "${kafka.bootstrapServers:localhost:9093}")
private String bootstrapServers;
@Value(value = "${order.kafka.topics.bank-account-event-store:bank-account-event-store}")
@Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}")
private String bankAccountTopicName;
@Bean

View File

@@ -25,6 +25,7 @@ public class EventStore implements EventStoreDB {
private static final String EXISTS_QUERY = "SELECT aggregate_id FROM events WHERE e e.aggregate_id = :aggregate_id";
private final NamedParameterJdbcTemplate jdbcTemplate;
private final EventBus eventBus;
@Override
public void saveEvents(List<Event> events) {
@@ -78,6 +79,8 @@ public class EventStore implements EventStoreDB {
@Override
@Transactional
public <T extends AggregateRoot> void save(T aggregate) {
final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());
if (aggregate.getVersion() > 1) {
this.handleConcurrency(aggregate.getId());
}
@@ -87,6 +90,8 @@ public class EventStore implements EventStoreDB {
this.saveSnapshot(aggregate);
}
eventBus.publish(aggregateEvents);
log.info("(save) saved aggregate: {}", aggregate);
}

View File

@@ -0,0 +1,37 @@
package com.eventsourcing.es;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaEventBus implements EventBus {
private final KafkaTemplate<String, byte[]> kafkaTemplate;
@Value(value = "${order.kafka.topics.bank-account-event-store:bank-account-event-store}")
private String bankAccountTopicName;
@Override
public void publish(List<Event> events) {
final byte[] eventsBytes = SerializerUtils.serializeToJsonBytes(events.toArray(new Event[]{}));
final ProducerRecord<String, byte[]> record = new ProducerRecord<>(bankAccountTopicName, eventsBytes);
try {
kafkaTemplate.send(record).get(3000, TimeUnit.MILLISECONDS);
log.info("publishing kafka record value >>>>> {}", new String(record.value()));
} catch (Exception ex) {
log.error("(KafkaEventBus) publish get", ex);
throw new RuntimeException(ex);
}
}
}

View File

@@ -25,6 +25,6 @@ spring.kafka.consumer.group-id=microservice
logging.level.org.apache.kafka=warn
order.kafka.topics.bank-account-event-store=bank-account-event-store
order.kafka.groupId=es_microservice
order.kafka.default-concurrency=10
microservice.kafka.topics.bank-account-event-store=bank-account-event-store
microservice.kafka.groupId=es_microservice
microservice.kafka.default-concurrency=10

View File

@@ -0,0 +1,43 @@
CREATE EXTENSION IF NOT EXISTS citext;
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
DROP TABLE IF EXISTS events CASCADE;
DROP TABLE IF EXISTS snapshots CASCADE;
CREATE TABLE IF NOT EXISTS events
(
event_id UUID DEFAULT uuid_generate_v4(),
aggregate_id VARCHAR(250) NOT NULL CHECK ( aggregate_id <> '' ),
aggregate_type VARCHAR(250) NOT NULL CHECK ( aggregate_type <> '' ),
event_type VARCHAR(250) NOT NULL CHECK ( event_type <> '' ),
data BYTEA,
metadata BYTEA,
version SERIAL NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE (aggregate_id, version)
) PARTITION BY HASH (aggregate_id);
CREATE INDEX IF NOT EXISTS aggregate_id_aggregate_version_idx ON events USING btree (aggregate_id, version ASC);
CREATE TABLE IF NOT EXISTS events_partition_hash_1 PARTITION OF events
FOR VALUES WITH (MODULUS 3, REMAINDER 0);
CREATE TABLE IF NOT EXISTS events_partition_hash_2 PARTITION OF events
FOR VALUES WITH (MODULUS 3, REMAINDER 1);
CREATE TABLE IF NOT EXISTS events_partition_hash_3 PARTITION OF events
FOR VALUES WITH (MODULUS 3, REMAINDER 2);
CREATE TABLE IF NOT EXISTS snapshots
(
snapshot_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
aggregate_id VARCHAR(250) UNIQUE NOT NULL CHECK ( aggregate_id <> '' ),
aggregate_type VARCHAR(250) NOT NULL CHECK ( aggregate_type <> '' ),
data BYTEA,
metadata BYTEA,
version SERIAL NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE (aggregate_id)
);
CREATE INDEX IF NOT EXISTS aggregate_id_aggregate_version_idx ON snapshots USING btree (aggregate_id, version);