feature: add bank account mongo projection improvements update
This commit is contained in:
@@ -15,7 +15,6 @@ import com.eventsourcing.es.SerializerUtils;
|
||||
import com.eventsourcing.mappers.BankAccountMapper;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
@@ -35,9 +34,6 @@ public class BankAccountMongoProjection implements Projection {
|
||||
private final BankAccountMongoRepository mongoRepository;
|
||||
private final EventStoreDB eventStoreDB;
|
||||
|
||||
@Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}")
|
||||
private String bankAccountTopicName;
|
||||
|
||||
|
||||
@KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"},
|
||||
groupId = "${microservice.kafka.groupId}",
|
||||
@@ -59,14 +55,15 @@ public class BankAccountMongoProjection implements Projection {
|
||||
}
|
||||
|
||||
private void processEvents(List<Event> events) {
|
||||
if (events.isEmpty()) return;
|
||||
|
||||
try {
|
||||
events.forEach(this::when);
|
||||
} catch (Exception ex) {
|
||||
// TODO: delete from mongo, get from eventStore, upsert to mongo
|
||||
mongoRepository.deleteByAggregateId(events.get(0).getAggregateId());
|
||||
final var aggregate = eventStoreDB.load(events.get(0).getAggregateId(), BankAccountAggregate.class);
|
||||
final var document = BankAccountMapper.bankAccountDocumentFromAggregate(aggregate);
|
||||
BankAccountDocument result = mongoRepository.save(document);
|
||||
final var result = mongoRepository.save(document);
|
||||
log.info("(processEvents) saved document: {}", result);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user