feature: add kafka event bus
This commit is contained in:
@@ -0,0 +1,41 @@
|
||||
package com.eventsourcing.bankAccount.projection;
|
||||
|
||||
|
||||
import com.eventsourcing.es.Event;
|
||||
import com.eventsourcing.es.SerializerUtils;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.messaging.handler.annotation.Payload;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class BankAccountMongoProjection {
|
||||
|
||||
@Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}")
|
||||
private String bankAccountTopicName;
|
||||
|
||||
|
||||
@KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"},
|
||||
groupId = "${microservice.kafka.groupId}",
|
||||
concurrency = "${microservice.kafka.default-concurrency}")
|
||||
public void bankAccountMongoProjectionListener(@Payload byte[] data, ConsumerRecordMetadata meta, Acknowledgment ack) {
|
||||
log.info("(BankAccountMongoProjection) data: {}", new String(data));
|
||||
|
||||
try {
|
||||
final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data);
|
||||
ack.acknowledge();
|
||||
log.info("ack events: {}", Arrays.toString(events));
|
||||
} catch (Exception e) {
|
||||
ack.nack(100);
|
||||
log.error("bankAccountMongoProjectionListener: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -21,7 +21,7 @@ public class KafkaTopicConfiguration {
|
||||
@Value(value = "${kafka.bootstrapServers:localhost:9093}")
|
||||
private String bootstrapServers;
|
||||
|
||||
@Value(value = "${order.kafka.topics.bank-account-event-store:bank-account-event-store}")
|
||||
@Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}")
|
||||
private String bankAccountTopicName;
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -25,6 +25,7 @@ public class EventStore implements EventStoreDB {
|
||||
private static final String EXISTS_QUERY = "SELECT aggregate_id FROM events WHERE e e.aggregate_id = :aggregate_id";
|
||||
|
||||
private final NamedParameterJdbcTemplate jdbcTemplate;
|
||||
private final EventBus eventBus;
|
||||
|
||||
@Override
|
||||
public void saveEvents(List<Event> events) {
|
||||
@@ -78,6 +79,8 @@ public class EventStore implements EventStoreDB {
|
||||
@Override
|
||||
@Transactional
|
||||
public <T extends AggregateRoot> void save(T aggregate) {
|
||||
final List<Event> aggregateEvents = new ArrayList<>(aggregate.getChanges());
|
||||
|
||||
if (aggregate.getVersion() > 1) {
|
||||
this.handleConcurrency(aggregate.getId());
|
||||
}
|
||||
@@ -87,6 +90,8 @@ public class EventStore implements EventStoreDB {
|
||||
this.saveSnapshot(aggregate);
|
||||
}
|
||||
|
||||
eventBus.publish(aggregateEvents);
|
||||
|
||||
log.info("(save) saved aggregate: {}", aggregate);
|
||||
}
|
||||
|
||||
|
||||
37
src/main/java/com/eventsourcing/es/KafkaEventBus.java
Normal file
37
src/main/java/com/eventsourcing/es/KafkaEventBus.java
Normal file
@@ -0,0 +1,37 @@
|
||||
package com.eventsourcing.es;
|
||||
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class KafkaEventBus implements EventBus {
|
||||
private final KafkaTemplate<String, byte[]> kafkaTemplate;
|
||||
|
||||
@Value(value = "${order.kafka.topics.bank-account-event-store:bank-account-event-store}")
|
||||
private String bankAccountTopicName;
|
||||
|
||||
@Override
|
||||
public void publish(List<Event> events) {
|
||||
final byte[] eventsBytes = SerializerUtils.serializeToJsonBytes(events.toArray(new Event[]{}));
|
||||
final ProducerRecord<String, byte[]> record = new ProducerRecord<>(bankAccountTopicName, eventsBytes);
|
||||
|
||||
try {
|
||||
kafkaTemplate.send(record).get(3000, TimeUnit.MILLISECONDS);
|
||||
log.info("publishing kafka record value >>>>> {}", new String(record.value()));
|
||||
|
||||
} catch (Exception ex) {
|
||||
log.error("(KafkaEventBus) publish get", ex);
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user