package com.eventsourcing.bankAccount.projection; import com.eventsourcing.bankAccount.events.AddressUpdatedEvent; import com.eventsourcing.bankAccount.events.BalanceDepositedEvent; import com.eventsourcing.bankAccount.events.BankAccountCreatedEvent; import com.eventsourcing.bankAccount.events.EmailChangedEvent; import com.eventsourcing.es.Event; import com.eventsourcing.es.Projection; import com.eventsourcing.es.SerializerUtils; import com.eventsourcing.exceptions.UnknownEventTypeException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata; import org.springframework.kafka.support.Acknowledgment; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; import java.util.Arrays; @Service @Slf4j @RequiredArgsConstructor public class BankAccountMongoProjection implements Projection { @Value(value = "${microservice.kafka.topics.bank-account-event-store:bank-account-event-store}") private String bankAccountTopicName; @KafkaListener(topics = {"${microservice.kafka.topics.bank-account-event-store}"}, groupId = "${microservice.kafka.groupId}", concurrency = "${microservice.kafka.default-concurrency}") public void bankAccountMongoProjectionListener(@Payload byte[] data, ConsumerRecordMetadata meta, Acknowledgment ack) { log.info("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp()); log.info("(BankAccountMongoProjection) data: {}", new String(data)); try { final Event[] events = SerializerUtils.deserializeEventsFromJsonBytes(data); Arrays.stream(events).toList().forEach(this::when); ack.acknowledge(); log.info("ack events: {}", Arrays.toString(events)); } catch (Exception e) { ack.nack(100); log.error("(BankAccountMongoProjection) topic: {}, offset: {}, partition: {}, timestamp: {}", meta.topic(), meta.offset(), meta.partition(), meta.timestamp()); log.error("bankAccountMongoProjectionListener: {}", e.getMessage()); } } @Override public void when(Event event) { final var aggregateId = event.getAggregateId(); log.info("(when) >>>>> aggregateId: {}", aggregateId); switch (event.getEventType()) { case BankAccountCreatedEvent.BANK_ACCOUNT_CREATED_V1 -> handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BankAccountCreatedEvent.class)); case EmailChangedEvent.EMAIL_CHANGED_V1 -> handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), EmailChangedEvent.class)); case AddressUpdatedEvent.ADDRESS_UPDATED_V1 -> handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), AddressUpdatedEvent.class)); case BalanceDepositedEvent.BALANCE_DEPOSITED -> handle(SerializerUtils.deserializeFromJsonBytes(event.getData(), BalanceDepositedEvent.class)); default -> throw new UnknownEventTypeException(event.getEventType()); } } private void handle(BankAccountCreatedEvent event) { log.info("(when) BankAccountCreatedEvent: {}, aggregateID: {}", event, event.getAggregateId()); // final var document = BankAccountDocument.builder() // .aggregateId(event.getAggregateId()) // .email(event.getEmail()) // .address(event.getAddress()) // .userName(event.getUserName()) // .balance(BigDecimal.valueOf(0)) // .build(); } private void handle(EmailChangedEvent event) { log.info("(when) EmailChangedEvent: {}, aggregateID: {}", event, event.getAggregateId()); } private void handle(AddressUpdatedEvent event) { log.info("(when) AddressUpdatedEvent: {}, aggregateID: {}", event, event.getAggregateId()); } private void handle(BalanceDepositedEvent event) { log.info("(when) BalanceDepositedEvent: {}, aggregateID: {}", event, event.getAggregateId()); } }