diff --git a/payment-service/payment-container/src/test/java/com/food/order/system/PaymentRequestMessageListenerTest.java b/payment-service/payment-container/src/test/java/com/food/order/system/PaymentRequestMessageListenerTest.java new file mode 100644 index 0000000..7f64d62 --- /dev/null +++ b/payment-service/payment-container/src/test/java/com/food/order/system/PaymentRequestMessageListenerTest.java @@ -0,0 +1,115 @@ +package com.food.order.system; + +import com.food.order.system.outbox.OutboxStatus; +import com.food.order.system.payment.application.service.dto.PaymentRequest; +import com.food.order.system.payment.application.service.ports.input.message.listener.PaymentRequestMessageListener; +import com.food.order.system.payment.data.access.outobx.entity.OrderOutboxEntity; +import com.food.order.system.payment.data.access.outobx.repository.OrderOutboxJpaRepository; +import com.food.order.system.payment.service.domain.PaymentServiceApplication; +import com.food.order.system.valueobject.PaymentOrderStatus; +import com.food.order.system.valueobject.PaymentStatus; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.postgresql.util.PSQLException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.dao.DataAccessException; + +import java.math.BigDecimal; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static com.food.order.system.outbox.order.SagaConst.ORDER_PROCESSING_SAGA; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Slf4j +@SpringBootTest(classes = {PaymentServiceApplication.class}) +class PaymentRequestMessageListenerTest { + + @Autowired + private PaymentRequestMessageListener paymentRequestMessageListener; + + @Autowired + private OrderOutboxJpaRepository orderOutboxRepository; + + private final static String CUSTOMER_ID = "d215b5f8-0249-4dc5-89a3-51fd148cfb41"; + private final static BigDecimal PRICE = new BigDecimal("100"); + + @Test + void testDoublePayment() { + String sagaId = UUID.randomUUID().toString(); + paymentRequestMessageListener.completePayment(getPaymentRequest(sagaId)); + try { + paymentRequestMessageListener.completePayment(getPaymentRequest(sagaId)); + } catch (DataAccessException e) { + log.error("DataAccessException occurred with sql state: {}", + ((PSQLException) Objects.requireNonNull(e.getRootCause())).getSQLState()); + } + assertOrderOutbox(sagaId); + } + + @Test + void testDoublePaymentWithThreads() { + String sagaId = UUID.randomUUID().toString(); + ExecutorService executor = null; + + try { + executor = Executors.newFixedThreadPool(2); + List> tasks = new ArrayList<>(); + + tasks.add(Executors.callable(() -> { + try { + paymentRequestMessageListener.completePayment(getPaymentRequest(sagaId)); + } catch (DataAccessException e) { + log.error("DataAccessException occurred for thread 1 with sql state: {}", + ((PSQLException) Objects.requireNonNull(e.getRootCause())).getSQLState()); + } + })); + + tasks.add(Executors.callable(() -> { + try { + paymentRequestMessageListener.completePayment(getPaymentRequest(sagaId)); + } catch (DataAccessException e) { + log.error("DataAccessException occurred for thread 2 with sql state: {}", + ((PSQLException) Objects.requireNonNull(e.getRootCause())).getSQLState()); + } + })); + + executor.invokeAll(tasks); + + assertOrderOutbox(sagaId); + } catch (InterruptedException e) { + log.error("Error calling complete payment!", e); + } finally { + if (executor != null) { + executor.shutdown(); + } + } + } + + private void assertOrderOutbox(String sagaId) { + Optional orderOutboxEntity = orderOutboxRepository + .findByTypeAndSagaIdAndPaymentStatusAndOutboxStatus(ORDER_PROCESSING_SAGA, + UUID.fromString(sagaId), + PaymentStatus.COMPLETED, + OutboxStatus.STARTED); + assertTrue(orderOutboxEntity.isPresent()); + assertEquals(orderOutboxEntity.get().getSagaId().toString(), sagaId); + } + + private PaymentRequest getPaymentRequest(String sagaId) { + return PaymentRequest.builder() + .id(UUID.randomUUID().toString()) + .sagaId(sagaId) + .orderId(UUID.randomUUID().toString()) + .status(PaymentOrderStatus.PENDING) + .customerId(CUSTOMER_ID) + .price(PRICE) + .createdAt(Instant.now()) + .build(); + } +} diff --git a/payment-service/payment-dataaccess/src/main/java/com/food/order/system/payment/data/access/outobx/adapter/OrderOutboxRepositoryImpl.java b/payment-service/payment-dataaccess/src/main/java/com/food/order/system/payment/data/access/outobx/adapter/OrderOutboxRepositoryImpl.java new file mode 100644 index 0000000..66347d4 --- /dev/null +++ b/payment-service/payment-dataaccess/src/main/java/com/food/order/system/payment/data/access/outobx/adapter/OrderOutboxRepositoryImpl.java @@ -0,0 +1,58 @@ +package com.food.order.system.payment.data.access.outobx.adapter; + + +import com.food.order.system.outbox.OutboxStatus; +import com.food.order.system.payment.application.service.outbox.model.OrderOutboxMessage; +import com.food.order.system.payment.application.service.ports.output.repository.OrderOutboxRepository; +import com.food.order.system.payment.data.access.outobx.exception.OrderOutboxNotFoundException; +import com.food.order.system.payment.data.access.outobx.mapper.OrderOutboxDataAccessMapper; +import com.food.order.system.payment.data.access.outobx.repository.OrderOutboxJpaRepository; +import com.food.order.system.valueobject.PaymentStatus; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +@Component +@RequiredArgsConstructor +public class OrderOutboxRepositoryImpl implements OrderOutboxRepository { + + private final OrderOutboxJpaRepository orderOutboxJpaRepository; + private final OrderOutboxDataAccessMapper orderOutboxDataAccessMapper; + + @Override + public OrderOutboxMessage save(OrderOutboxMessage orderPaymentOutboxMessage) { + return orderOutboxDataAccessMapper + .orderOutboxEntityToOrderOutboxMessage(orderOutboxJpaRepository + .save(orderOutboxDataAccessMapper + .orderOutboxMessageToOutboxEntity(orderPaymentOutboxMessage))); + } + + @Override + public Optional> findByTypeAndOutboxStatus(String sagaType, OutboxStatus outboxStatus) { + return Optional.of(orderOutboxJpaRepository.findByTypeAndOutboxStatus(sagaType, outboxStatus) + .orElseThrow(() -> new OrderOutboxNotFoundException("Approval outbox object " + + "cannot be found for saga type " + sagaType)) + .stream() + .map(orderOutboxDataAccessMapper::orderOutboxEntityToOrderOutboxMessage) + .collect(Collectors.toList())); + } + + @Override + public Optional findByTypeAndSagaIdAndPaymentStatusAndOutboxStatus(String sagaType, + UUID sagaId, + PaymentStatus paymentStatus, + OutboxStatus outboxStatus) { + return orderOutboxJpaRepository.findByTypeAndSagaIdAndPaymentStatusAndOutboxStatus(sagaType, sagaId, + paymentStatus, outboxStatus) + .map(orderOutboxDataAccessMapper::orderOutboxEntityToOrderOutboxMessage); + } + + @Override + public void deleteByTypeAndOutboxStatus(String sagaType, OutboxStatus outboxStatus) { + orderOutboxJpaRepository.deleteByTypeAndOutboxStatus(sagaType, outboxStatus); + } +} diff --git a/payment-service/payment-dataaccess/src/main/java/com/food/order/system/payment/data/access/outobx/entity/OrderOutboxEntity.java b/payment-service/payment-dataaccess/src/main/java/com/food/order/system/payment/data/access/outobx/entity/OrderOutboxEntity.java new file mode 100644 index 0000000..173e8e3 --- /dev/null +++ b/payment-service/payment-dataaccess/src/main/java/com/food/order/system/payment/data/access/outobx/entity/OrderOutboxEntity.java @@ -0,0 +1,49 @@ +package com.food.order.system.payment.data.access.outobx.entity; + +import com.food.order.system.outbox.OutboxStatus; +import com.food.order.system.valueobject.PaymentStatus; +import lombok.*; + +import javax.persistence.*; +import java.time.ZonedDateTime; +import java.util.Objects; +import java.util.UUID; + +@Getter +@Setter +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Table(name = "order_outbox") +@Entity +public class OrderOutboxEntity { + + @Id + private UUID id; + private UUID sagaId; + private ZonedDateTime createdAt; + private ZonedDateTime processedAt; + private String type; + private String payload; + @Enumerated(EnumType.STRING) + private OutboxStatus outboxStatus; + @Enumerated(EnumType.STRING) + private PaymentStatus paymentStatus; + @Version + private int version; + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OrderOutboxEntity that = (OrderOutboxEntity) o; + return id.equals(that.id); + } + + @Override + public int hashCode() { + return Objects.hash(id); + } +} + diff --git a/payment-service/payment-dataaccess/src/main/java/com/food/order/system/payment/data/access/outobx/exception/OrderOutboxNotFoundException.java b/payment-service/payment-dataaccess/src/main/java/com/food/order/system/payment/data/access/outobx/exception/OrderOutboxNotFoundException.java new file mode 100644 index 0000000..48ade07 --- /dev/null +++ b/payment-service/payment-dataaccess/src/main/java/com/food/order/system/payment/data/access/outobx/exception/OrderOutboxNotFoundException.java @@ -0,0 +1,8 @@ +package com.food.order.system.payment.data.access.outobx.exception; + +public class OrderOutboxNotFoundException extends RuntimeException { + + public OrderOutboxNotFoundException(String message) { + super(message); + } +} diff --git a/payment-service/payment-dataaccess/src/main/java/com/food/order/system/payment/data/access/outobx/mapper/OrderOutboxDataAccessMapper.java b/payment-service/payment-dataaccess/src/main/java/com/food/order/system/payment/data/access/outobx/mapper/OrderOutboxDataAccessMapper.java new file mode 100644 index 0000000..a6ed8c8 --- /dev/null +++ b/payment-service/payment-dataaccess/src/main/java/com/food/order/system/payment/data/access/outobx/mapper/OrderOutboxDataAccessMapper.java @@ -0,0 +1,37 @@ +package com.food.order.system.payment.data.access.outobx.mapper; + + +import com.food.order.system.payment.application.service.outbox.model.OrderOutboxMessage; +import com.food.order.system.payment.data.access.outobx.entity.OrderOutboxEntity; +import org.springframework.stereotype.Component; + +@Component +public class OrderOutboxDataAccessMapper { + + public OrderOutboxEntity orderOutboxMessageToOutboxEntity(OrderOutboxMessage orderOutboxMessage) { + return OrderOutboxEntity.builder() + .id(orderOutboxMessage.getId()) + .sagaId(orderOutboxMessage.getSagaId()) + .createdAt(orderOutboxMessage.getCreatedAt()) + .type(orderOutboxMessage.getType()) + .payload(orderOutboxMessage.getPayload()) + .outboxStatus(orderOutboxMessage.getOutboxStatus()) + .paymentStatus(orderOutboxMessage.getPaymentStatus()) + .version(orderOutboxMessage.getVersion()) + .build(); + } + + public OrderOutboxMessage orderOutboxEntityToOrderOutboxMessage(OrderOutboxEntity paymentOutboxEntity) { + return OrderOutboxMessage.builder() + .id(paymentOutboxEntity.getId()) + .sagaId(paymentOutboxEntity.getSagaId()) + .createdAt(paymentOutboxEntity.getCreatedAt()) + .type(paymentOutboxEntity.getType()) + .payload(paymentOutboxEntity.getPayload()) + .outboxStatus(paymentOutboxEntity.getOutboxStatus()) + .paymentStatus(paymentOutboxEntity.getPaymentStatus()) + .version(paymentOutboxEntity.getVersion()) + .build(); + } + +} diff --git a/payment-service/payment-dataaccess/src/main/java/com/food/order/system/payment/data/access/outobx/repository/OrderOutboxJpaRepository.java b/payment-service/payment-dataaccess/src/main/java/com/food/order/system/payment/data/access/outobx/repository/OrderOutboxJpaRepository.java new file mode 100644 index 0000000..e67f7f6 --- /dev/null +++ b/payment-service/payment-dataaccess/src/main/java/com/food/order/system/payment/data/access/outobx/repository/OrderOutboxJpaRepository.java @@ -0,0 +1,26 @@ +package com.food.order.system.payment.data.access.outobx.repository; + + +import com.food.order.system.outbox.OutboxStatus; +import com.food.order.system.payment.data.access.outobx.entity.OrderOutboxEntity; +import com.food.order.system.valueobject.PaymentStatus; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +@Repository +public interface OrderOutboxJpaRepository extends JpaRepository { + + Optional> findByTypeAndOutboxStatus(String type, OutboxStatus outboxStatus); + + Optional findByTypeAndSagaIdAndPaymentStatusAndOutboxStatus(String type, + UUID sagaId, + PaymentStatus paymentStatus, + OutboxStatus outboxStatus); + + void deleteByTypeAndOutboxStatus(String type, OutboxStatus outboxStatus); + +} diff --git a/payment-service/payment-domain/payment-application-service/src/main/java/com/food/order/system/payment/application/service/PaymentRequestHelper.java b/payment-service/payment-domain/payment-application-service/src/main/java/com/food/order/system/payment/application/service/PaymentRequestHelper.java index 8f046bb..d40a01c 100644 --- a/payment-service/payment-domain/payment-application-service/src/main/java/com/food/order/system/payment/application/service/PaymentRequestHelper.java +++ b/payment-service/payment-domain/payment-application-service/src/main/java/com/food/order/system/payment/application/service/PaymentRequestHelper.java @@ -13,6 +13,7 @@ import com.food.order.system.payment.service.domain.PaymentDomainService; import com.food.order.system.payment.service.domain.entity.CreditEntry; import com.food.order.system.payment.service.domain.entity.CreditHistory; import com.food.order.system.payment.service.domain.entity.Payment; +import com.food.order.system.payment.service.domain.exception.PaymentNotFoundException; import com.food.order.system.valueobject.CustomerId; import com.food.order.system.valueobject.PaymentStatus; import lombok.RequiredArgsConstructor; @@ -62,10 +63,6 @@ public class PaymentRequestHelper { paymentEvent.getPayment().getStatus(), OutboxStatus.STARTED, UUID.fromString(paymentRequest.getSagaId())); - - - - } private boolean publishIfOutboxMessageProcessedForPayment(PaymentRequest paymentRequest, @@ -91,7 +88,7 @@ public class PaymentRequestHelper { log.info("Received payment cancel event for id : {}", paymentRequest.getOrderId()); var payment = paymentRepository.findByOrderId (UUID.fromString(paymentRequest.getOrderId())).orElseThrow( - () -> new PaymentApplicationServiceException("Payment not found")); + () -> new PaymentNotFoundException("Payment not found")); var creditEntry = getCreditEntry(payment.getCustomerId()); var creditHistory = getCreditHistory(payment.getCustomerId()); List failureMessage = new ArrayList<>(); diff --git a/payment-service/payment-messaging/pom.xml b/payment-service/payment-messaging/pom.xml index cd274e4..67b72b5 100644 --- a/payment-service/payment-messaging/pom.xml +++ b/payment-service/payment-messaging/pom.xml @@ -31,5 +31,10 @@ com.food.order kafka-model + + + org.postgresql + postgresql + \ No newline at end of file diff --git a/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/listener/kafka/PaymentRequestKafkaListener.java b/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/listener/kafka/PaymentRequestKafkaListener.java index 00787cf..1adf019 100644 --- a/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/listener/kafka/PaymentRequestKafkaListener.java +++ b/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/listener/kafka/PaymentRequestKafkaListener.java @@ -2,18 +2,24 @@ package com.food.order.system.payment.messaging.listener.kafka; import com.food.order.system.kafka.consumer.KafkaConsumer; import com.food.order.system.kafka.order.avro.model.PaymentRequestAvroModel; +import com.food.order.system.payment.application.service.exception.PaymentApplicationServiceException; import com.food.order.system.payment.application.service.ports.input.message.listener.PaymentRequestMessageListener; import com.food.order.system.payment.messaging.mapper.PaymentMessagingDataMapper; +import com.food.order.system.payment.service.domain.exception.PaymentNotFoundException; import com.food.order.system.valueobject.PaymentOrderStatus; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.postgresql.util.PSQLState; +import org.springframework.dao.DataAccessException; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; +import java.sql.SQLException; import java.util.List; +import java.util.Objects; @Component @Slf4j @@ -38,14 +44,33 @@ public class PaymentRequestKafkaListener implements KafkaConsumer { - if (PaymentOrderStatus.PENDING.name().equalsIgnoreCase(paymentRequestAvroModel.getPaymentOrderStatus().name())) { - log.info("Processing payment for order id: {}", paymentRequestAvroModel.getOrderId()); - paymentRequestMessageListener.completePayment(paymentMessagingDataMapper - .paymentRequestAvroModelToPaymentRequest(paymentRequestAvroModel)); - } else if(PaymentOrderStatus.CANCELLED.name().equalsIgnoreCase(paymentRequestAvroModel.getPaymentOrderStatus().name())) { - log.info("Cancelling payment for order id: {}", paymentRequestAvroModel.getOrderId()); - paymentRequestMessageListener.cancelPayment(paymentMessagingDataMapper - .paymentRequestAvroModelToPaymentRequest(paymentRequestAvroModel)); + try { + if (Objects.equals(PaymentOrderStatus.PENDING.name(), + paymentRequestAvroModel.getPaymentOrderStatus().name())) { + log.info("Processing payment for order id: {}", paymentRequestAvroModel.getOrderId()); + paymentRequestMessageListener.completePayment(paymentMessagingDataMapper + .paymentRequestAvroModelToPaymentRequest(paymentRequestAvroModel)); + } else if(PaymentOrderStatus.CANCELLED.name().equals + (paymentRequestAvroModel.getPaymentOrderStatus().name())) { + log.info("Cancelling payment for order id: {}", paymentRequestAvroModel.getOrderId()); + paymentRequestMessageListener.cancelPayment(paymentMessagingDataMapper + .paymentRequestAvroModelToPaymentRequest(paymentRequestAvroModel)); + } + } catch (DataAccessException e) { + SQLException sqlException = (SQLException) e.getRootCause(); + if (sqlException != null && sqlException.getSQLState() != null && + PSQLState.UNIQUE_VIOLATION.getState().equals(sqlException.getSQLState())) { + //NO-OP for unique constraint exception + log.error("Caught unique constraint exception with sql state: {} " + + "in PaymentRequestKafkaListener for order id: {}", + sqlException.getSQLState(), paymentRequestAvroModel.getOrderId()); + } else { + throw new PaymentApplicationServiceException("Throwing DataAccessException in" + + " PaymentRequestKafkaListener: " + e.getMessage(), e); + } + } catch (PaymentNotFoundException e) { + //NO-OP for PaymentNotFoundException + log.error("No payment found for order id: {}", paymentRequestAvroModel.getOrderId()); } }); diff --git a/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/mapper/PaymentMessagingDataMapper.java b/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/mapper/PaymentMessagingDataMapper.java index 83aee89..0057c4e 100644 --- a/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/mapper/PaymentMessagingDataMapper.java +++ b/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/mapper/PaymentMessagingDataMapper.java @@ -4,9 +4,7 @@ import com.food.order.system.kafka.order.avro.model.PaymentRequestAvroModel; import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel; import com.food.order.system.kafka.order.avro.model.PaymentStatus; import com.food.order.system.payment.application.service.dto.PaymentRequest; -import com.food.order.system.payment.service.domain.event.PaymentCancelledEvent; -import com.food.order.system.payment.service.domain.event.PaymentCompletedEvent; -import com.food.order.system.payment.service.domain.event.PaymentFailedEvent; +import com.food.order.system.payment.application.service.outbox.model.OrderEventPayload; import com.food.order.system.valueobject.PaymentOrderStatus; import org.springframework.stereotype.Component; @@ -15,47 +13,6 @@ import java.util.UUID; @Component public class PaymentMessagingDataMapper { - public PaymentResponseAvroModel paymentCompletedEventToPaymentResponseAvroModel(PaymentCompletedEvent paymentCompletedEvent) { - return PaymentResponseAvroModel.newBuilder() - .setId(UUID.randomUUID().toString()) - .setSagaId("") - .setPaymentId(paymentCompletedEvent.getPayment().getId().getValue().toString()) - .setCustomerId(paymentCompletedEvent.getPayment().getCustomerId().getValue().toString()) - .setOrderId(paymentCompletedEvent.getPayment().getOrderId().getValue().toString()) - .setPrice(paymentCompletedEvent.getPayment().getPrice().getAmount()) - .setCreatedAt(paymentCompletedEvent.getPayment().getCreatedAt().toInstant()) - .setPaymentStatus(PaymentStatus.valueOf(paymentCompletedEvent.getPayment().getStatus().name())) - .setFailureMessages(paymentCompletedEvent.getFailureMessages()) - .build(); - } - - public PaymentResponseAvroModel paymentCancelEventToPaymentResponseAvroModel(PaymentCancelledEvent paymentCancelledEvent) { - return PaymentResponseAvroModel.newBuilder() - .setId(UUID.randomUUID().toString()) - .setSagaId("") - .setPaymentId(paymentCancelledEvent.getPayment().getId().getValue().toString()) - .setCustomerId(paymentCancelledEvent.getPayment().getCustomerId().getValue().toString()) - .setOrderId(paymentCancelledEvent.getPayment().getOrderId().getValue().toString()) - .setPrice(paymentCancelledEvent.getPayment().getPrice().getAmount()) - .setCreatedAt(paymentCancelledEvent.getPayment().getCreatedAt().toInstant()) - .setPaymentStatus(PaymentStatus.valueOf(paymentCancelledEvent.getPayment().getStatus().name())) - .setFailureMessages(paymentCancelledEvent.getFailureMessages()) - .build(); - } - - public PaymentResponseAvroModel paymentFailedEventToPaymentResponseAvroModel(PaymentFailedEvent paymentFailedEvent) { - return PaymentResponseAvroModel.newBuilder() - .setId(UUID.randomUUID().toString()) - .setSagaId("") - .setPaymentId(paymentFailedEvent.getPayment().getId().getValue().toString()) - .setCustomerId(paymentFailedEvent.getPayment().getCustomerId().getValue().toString()) - .setOrderId(paymentFailedEvent.getPayment().getOrderId().getValue().toString()) - .setPrice(paymentFailedEvent.getPayment().getPrice().getAmount()) - .setCreatedAt(paymentFailedEvent.getPayment().getCreatedAt().toInstant()) - .setPaymentStatus(PaymentStatus.valueOf(paymentFailedEvent.getPayment().getStatus().name())) - .setFailureMessages(paymentFailedEvent.getFailureMessages()) - .build(); - } public PaymentRequest paymentRequestAvroModelToPaymentRequest(PaymentRequestAvroModel paymentRequestAvroModel) { return PaymentRequest.builder() @@ -69,6 +26,19 @@ public class PaymentMessagingDataMapper { .build(); } - + public PaymentResponseAvroModel orderEventPayloadToPaymentResponseAvroModel(String sagaId, + OrderEventPayload orderEventPayload) { + return PaymentResponseAvroModel.newBuilder() + .setId(UUID.randomUUID().toString()) + .setSagaId(sagaId) + .setPaymentId(orderEventPayload.getPaymentId()) + .setCustomerId(orderEventPayload.getCustomerId()) + .setOrderId(orderEventPayload.getOrderId()) + .setPrice(orderEventPayload.getPrice()) + .setCreatedAt(orderEventPayload.getCreatedAt().toInstant())//?? + .setPaymentStatus(PaymentStatus.valueOf(orderEventPayload.getPaymentStatus())) + .setFailureMessages(orderEventPayload.getFailureMessages()) + .build(); + } } diff --git a/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/publisher/kafka/PaymentCancelledKafkaMessagePublisher.java b/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/publisher/kafka/PaymentCancelledKafkaMessagePublisher.java deleted file mode 100644 index 7d92464..0000000 --- a/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/publisher/kafka/PaymentCancelledKafkaMessagePublisher.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.food.order.system.payment.messaging.publisher.kafka; - -import com.food.order.system.event.publisher.DomainEventPublisher; -import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel; -import com.food.order.system.kafka.producer.KafkaMessageHelper; -import com.food.order.system.kafka.producer.service.KafkaProducer; -import com.food.order.system.payment.application.service.config.PaymentServiceConfigData; -import com.food.order.system.payment.messaging.mapper.PaymentMessagingDataMapper; -import com.food.order.system.payment.service.domain.event.PaymentCancelledEvent; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -@RequiredArgsConstructor -public class PaymentCancelledKafkaMessagePublisher implements DomainEventPublisher { - - private final PaymentMessagingDataMapper paymentDataMapper; - private final KafkaProducer kafkaProducer; - private final PaymentServiceConfigData paymentServiceConfigData; - private final KafkaMessageHelper kafkaMessageHelper; - - - @Override - public void publish(PaymentCancelledEvent event) { - log.info("Publishing payment cancelled event to kafka"); - var orderId = event.getPayment().getOrderId().getValue().toString(); - try { - var paymentResponseAvroModel = - paymentDataMapper.paymentCancelEventToPaymentResponseAvroModel(event); - - kafkaProducer.send(paymentServiceConfigData.getPaymentResponseTopicName(), - orderId, - paymentResponseAvroModel, - kafkaMessageHelper.getKafkaCallBack( - paymentServiceConfigData.getPaymentResponseTopicName(), - paymentResponseAvroModel, - orderId, - "PaymentResponseAvroModel")); - - log.info("Published payment cancelled event to kafka"); - } catch (Exception e) { - log.error("Error while publishing payment cancelled event to kafka", e); - } - } -} diff --git a/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/publisher/kafka/PaymentCompletedKafkaMessagePublisher.java b/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/publisher/kafka/PaymentCompletedKafkaMessagePublisher.java deleted file mode 100644 index e8de915..0000000 --- a/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/publisher/kafka/PaymentCompletedKafkaMessagePublisher.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.food.order.system.payment.messaging.publisher.kafka; - -import com.food.order.system.event.publisher.DomainEventPublisher; -import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel; -import com.food.order.system.kafka.producer.KafkaMessageHelper; -import com.food.order.system.kafka.producer.service.KafkaProducer; -import com.food.order.system.payment.application.service.config.PaymentServiceConfigData; -import com.food.order.system.payment.messaging.mapper.PaymentMessagingDataMapper; -import com.food.order.system.payment.service.domain.event.PaymentCompletedEvent; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -@RequiredArgsConstructor -public class PaymentCompletedKafkaMessagePublisher implements DomainEventPublisher { - - private final PaymentMessagingDataMapper paymentDataMapper; - private final KafkaProducer kafkaProducer; - private final PaymentServiceConfigData paymentServiceConfigData; - - private final KafkaMessageHelper kafkaMessageHelper; - - - @Override - public void publish(PaymentCompletedEvent event) { - log.info("Publishing payment completed event to kafka"); - var orderId = event.getPayment().getOrderId().getValue().toString(); - try { - var paymentResponseAvroModel = - paymentDataMapper.paymentCompletedEventToPaymentResponseAvroModel(event); - - kafkaProducer.send(paymentServiceConfigData.getPaymentResponseTopicName(), - orderId, - paymentResponseAvroModel, - kafkaMessageHelper.getKafkaCallBack( - paymentServiceConfigData.getPaymentResponseTopicName(), - paymentResponseAvroModel, - orderId, - "PaymentResponseAvroModel")); - - log.info("Published payment completed event to kafka"); - } catch (Exception e) { - log.error("Error while publishing payment completed event to kafka", e); - } - } -} diff --git a/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/publisher/kafka/PaymentEventKafkaPublisher.java b/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/publisher/kafka/PaymentEventKafkaPublisher.java new file mode 100644 index 0000000..8986d2c --- /dev/null +++ b/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/publisher/kafka/PaymentEventKafkaPublisher.java @@ -0,0 +1,61 @@ +package com.food.order.system.payment.messaging.publisher.kafka; + +import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel; +import com.food.order.system.kafka.producer.KafkaMessageHelper; +import com.food.order.system.kafka.producer.service.KafkaProducer; +import com.food.order.system.outbox.OutboxStatus; +import com.food.order.system.payment.application.service.config.PaymentServiceConfigData; +import com.food.order.system.payment.application.service.outbox.model.OrderEventPayload; +import com.food.order.system.payment.application.service.outbox.model.OrderOutboxMessage; +import com.food.order.system.payment.application.service.ports.output.message.publisher.PaymentResponseMessagePublisher; +import com.food.order.system.payment.messaging.mapper.PaymentMessagingDataMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.function.BiConsumer; + +@Component +@Slf4j +@RequiredArgsConstructor +public class PaymentEventKafkaPublisher implements PaymentResponseMessagePublisher { + + private final PaymentMessagingDataMapper paymentMessagingDataMapper; + private final KafkaProducer kafkaProducer; + private final PaymentServiceConfigData paymentServiceConfigData; + private final KafkaMessageHelper kafkaMessageHelper; + + @Override + public void publish(OrderOutboxMessage message, + BiConsumer outboxCallback) { + + var payload = + kafkaMessageHelper.getOrderEventPayload(message.getPayload(), OrderEventPayload.class); + + var sagaId = message.getSagaId().toString(); + + log.info("Publishing payment response for order id: {}", sagaId); + + try { + var paymentResponseAvroModel = + paymentMessagingDataMapper.orderEventPayloadToPaymentResponseAvroModel(sagaId,payload); + + kafkaProducer.send(paymentServiceConfigData.getPaymentResponseTopicName(), + sagaId, paymentResponseAvroModel, + kafkaMessageHelper.getKafkaCallback( + paymentServiceConfigData.getPaymentResponseTopicName(), + paymentResponseAvroModel, + message, + outboxCallback, + payload.getOrderId(), + "PaymentResponseAvroModel" + )); + + log.info("PaymentResponseAvroModel sent to kafka for order id: {} and saga id: {}", + paymentResponseAvroModel.getOrderId(), sagaId); + } + catch (Exception e) { + log.error("Error while publishing payment response for order id: {}", sagaId, e); + } + } +} diff --git a/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/publisher/kafka/PaymentFailedKafkaMessagePublisher.java b/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/publisher/kafka/PaymentFailedKafkaMessagePublisher.java deleted file mode 100644 index 1a599e9..0000000 --- a/payment-service/payment-messaging/src/main/java/com/food/order/system/payment/messaging/publisher/kafka/PaymentFailedKafkaMessagePublisher.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.food.order.system.payment.messaging.publisher.kafka; - -import com.food.order.system.event.publisher.DomainEventPublisher; -import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel; -import com.food.order.system.kafka.producer.KafkaMessageHelper; -import com.food.order.system.kafka.producer.service.KafkaProducer; -import com.food.order.system.payment.application.service.config.PaymentServiceConfigData; -import com.food.order.system.payment.messaging.mapper.PaymentMessagingDataMapper; -import com.food.order.system.payment.service.domain.event.PaymentFailedEvent; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -@RequiredArgsConstructor -public class PaymentFailedKafkaMessagePublisher implements DomainEventPublisher { - - private final PaymentMessagingDataMapper paymentDataMapper; - private final KafkaProducer kafkaProducer; - private final PaymentServiceConfigData paymentServiceConfigData; - - private final KafkaMessageHelper kafkaMessageHelper; - - - @Override - public void publish(PaymentFailedEvent event) { - log.info("Publishing payment failed event to kafka"); - var orderId = event.getPayment().getOrderId().getValue().toString(); - try { - var paymentResponseAvroModel = - paymentDataMapper.paymentFailedEventToPaymentResponseAvroModel(event); - - kafkaProducer.send(paymentServiceConfigData.getPaymentResponseTopicName(), - orderId, - paymentResponseAvroModel, - kafkaMessageHelper.getKafkaCallBack( - paymentServiceConfigData.getPaymentResponseTopicName(), - paymentResponseAvroModel, - orderId, - "PaymentResponseAvroModel")); - - log.info("Published payment failed event to kafka"); - } catch (Exception e) { - log.error("Error while publishing payment failed event to kafka", e); - } - } -}