From e1c2a8312b436624ad05c8ce285bc36d05683bc7 Mon Sep 17 00:00:00 2001 From: Ali CANLI Date: Sun, 17 Jul 2022 12:33:35 +0300 Subject: [PATCH] Outbox Message and Scheduler class bug fixed. --- .../data/access/entity/RestaurantEntity.java | 2 +- .../docker-compose/kafka_cluster.yml | 2 +- .../RestaurantApprovalOutboxScheduler.java | 38 ++++++-------- .../kafka/PaymentResponseKafkaListener.java | 39 +++++++++----- .../mapper/OrderMessagingDataMapper.java | 52 +------------------ pom.xml | 2 +- .../mapper/RestaurantDataAccessMapper.java | 2 +- 7 files changed, 47 insertions(+), 90 deletions(-) diff --git a/common/common-data-access/src/main/java/com/food/order/system/common/data/access/entity/RestaurantEntity.java b/common/common-data-access/src/main/java/com/food/order/system/common/data/access/entity/RestaurantEntity.java index 9a66f30..24300a6 100644 --- a/common/common-data-access/src/main/java/com/food/order/system/common/data/access/entity/RestaurantEntity.java +++ b/common/common-data-access/src/main/java/com/food/order/system/common/data/access/entity/RestaurantEntity.java @@ -34,7 +34,7 @@ public class RestaurantEntity { private BigDecimal productPrice; - private Boolean productActive; + private Boolean productAvailable; @Override public boolean equals(Object o) { diff --git a/infrastructure/docker-compose/kafka_cluster.yml b/infrastructure/docker-compose/kafka_cluster.yml index a8231d1..2f2bce9 100644 --- a/infrastructure/docker-compose/kafka_cluster.yml +++ b/infrastructure/docker-compose/kafka_cluster.yml @@ -13,7 +13,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081 - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-broker-2:9092,LISTENER_LOCAL://localhost:29092 + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-broker-1:9092,LISTENER_LOCAL://localhost:19092,PLAINTEXT://kafka-broker-2:9092,LISTENER_LOCAL://localhost:29092,PLAINTEXT://kafka-broker-3:9092,LISTENER_LOCAL://localhost:39092 SCHEMA_REGISTRY_DEBUG: 'true' networks: - ${GLOBAL_NETWORK:-kafka} diff --git a/order-service/order-domain/order-application-service/src/main/java/com/food/order/system/outbox/scheduler/approval/RestaurantApprovalOutboxScheduler.java b/order-service/order-domain/order-application-service/src/main/java/com/food/order/system/outbox/scheduler/approval/RestaurantApprovalOutboxScheduler.java index 65469b1..e09d961 100644 --- a/order-service/order-domain/order-application-service/src/main/java/com/food/order/system/outbox/scheduler/approval/RestaurantApprovalOutboxScheduler.java +++ b/order-service/order-domain/order-application-service/src/main/java/com/food/order/system/outbox/scheduler/approval/RestaurantApprovalOutboxScheduler.java @@ -1,18 +1,18 @@ package com.food.order.system.outbox.scheduler.approval; -import com.food.order.system.domain.exception.OrderDomainException; import com.food.order.system.outbox.OutboxScheduler; import com.food.order.system.outbox.OutboxStatus; import com.food.order.system.outbox.model.approval.OrderApprovalOutboxMessage; -import com.food.order.system.saga.SagaStatus; import com.food.order.system.ports.output.message.publisher.restaurantapproval.RestaurantApprovalRequestMessagePublisher; +import com.food.order.system.saga.SagaStatus; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import java.util.Objects; +import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; @Slf4j @@ -28,28 +28,20 @@ public class RestaurantApprovalOutboxScheduler implements OutboxScheduler { @Scheduled(fixedDelayString = "${order-service.outbox-scheduler-fixed-rate}", initialDelayString = "${order-service.outbox-scheduler-initial-delay}") public void processOutboxMessage() { - - log.info("Processing outbox message STARTED !"); - - var outboxMessageResponse = + Optional> outboxMessagesResponse = approvalOutboxHelper.getApprovalOutboxMessageByOutboxStatusAndSagaStatus( - OutboxStatus.STARTED, - SagaStatus.STARTED, - SagaStatus.COMPENSATING) - .orElseThrow( - () -> new OrderDomainException("No outbox message found for processing")); + OutboxStatus.STARTED, + SagaStatus.PROCESSING); + if (outboxMessagesResponse.isPresent() && outboxMessagesResponse.get().size() > 0) { + List outboxMessages = outboxMessagesResponse.get(); + log.info("Received {} OrderApprovalOutboxMessage with ids: {}, sending to message bus!", + outboxMessages.size(), + outboxMessages.stream().map(outboxMessage -> + outboxMessage.getId().toString()).collect(Collectors.joining(","))); + outboxMessages.forEach(outboxMessage -> + restaurantApprovalRequestMessagePublisher.publish(outboxMessage, this::updateOutboxStatus)); + log.info("{} OrderApprovalOutboxMessage sent to message bus!", outboxMessages.size()); - if (Objects.nonNull(outboxMessageResponse) && outboxMessageResponse.size() > 0) { - - log.info("Received {} OrderPaymentOutboxMessage with ids : {} , sending message bus !" , - outboxMessageResponse.size(), - outboxMessageResponse.stream().map(orderPaymentOutboxMessage -> orderPaymentOutboxMessage.getId().toString()) - .collect(Collectors.joining(","))); - outboxMessageResponse.forEach(orderPaymentOutboxMessage -> { - restaurantApprovalRequestMessagePublisher.publish - (orderPaymentOutboxMessage,this::updateOutboxStatus); - }); - log.info("Processing outbox message completed ! "); } } diff --git a/order-service/order-messaging/src/main/java/com/food/order/system/messaging/listener/kafka/PaymentResponseKafkaListener.java b/order-service/order-messaging/src/main/java/com/food/order/system/messaging/listener/kafka/PaymentResponseKafkaListener.java index c4f5714..cbd2ef5 100644 --- a/order-service/order-messaging/src/main/java/com/food/order/system/messaging/listener/kafka/PaymentResponseKafkaListener.java +++ b/order-service/order-messaging/src/main/java/com/food/order/system/messaging/listener/kafka/PaymentResponseKafkaListener.java @@ -1,5 +1,6 @@ package com.food.order.system.messaging.listener.kafka; +import com.food.order.system.domain.exception.OrderNotFoundException; import com.food.order.system.kafka.consumer.KafkaConsumer; import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel; import com.food.order.system.kafka.order.avro.model.PaymentStatus; @@ -7,6 +8,7 @@ import com.food.order.system.messaging.mapper.OrderMessagingDataMapper; import com.food.order.system.ports.input.message.listener.payment.PaymentResponseMessageListener; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; @@ -33,20 +35,31 @@ public class PaymentResponseKafkaListener implements KafkaConsumer partitions, @Header(KafkaHeaders.OFFSET) List offSets) { - log.info("{} number of payment responses received with keys : {} , partitions : {} , offsets : {}", - messages.size(), keys, partitions, offSets); + log.info("{} number of payment responses received with keys:{}, partitions:{} and offsets: {}", + messages.size(), + keys.toString(), + partitions.toString(), + offSets.toString()); - messages.forEach(message -> { - if (PaymentStatus.COMPLETED.equals(message.getPaymentStatus())) { - log.info("Processing successful payment response for order id: {}", message.getOrderId()); - paymentResponseMessageListener.paymentCompleted(orderMessagingDataMapper. - paymentResponseAvroModelToPaymentResponse(message)); - } - else if (PaymentStatus.FAILED.equals(message.getPaymentStatus()) || - PaymentStatus.CANCELLED.equals(message.getPaymentStatus())) { - log.info("Processing failed payment response for order id: {}", message.getOrderId()); - paymentResponseMessageListener.paymentCancelled(orderMessagingDataMapper. - paymentResponseAvroModelToPaymentResponse(message)); + messages.forEach(paymentResponseAvroModel -> { + try { + if (PaymentStatus.COMPLETED == paymentResponseAvroModel.getPaymentStatus()) { + log.info("Processing successful payment for order id: {}", paymentResponseAvroModel.getOrderId()); + paymentResponseMessageListener.paymentCompleted(orderMessagingDataMapper + .paymentResponseAvroModelToPaymentResponse(paymentResponseAvroModel)); + } else if (PaymentStatus.CANCELLED == paymentResponseAvroModel.getPaymentStatus() || + PaymentStatus.FAILED == paymentResponseAvroModel.getPaymentStatus()) { + log.info("Processing unsuccessful payment for order id: {}", paymentResponseAvroModel.getOrderId()); + paymentResponseMessageListener.paymentCancelled(orderMessagingDataMapper + .paymentResponseAvroModelToPaymentResponse(paymentResponseAvroModel)); + } + } catch (OptimisticLockingFailureException e) { + //NO-OP for optimistic lock. This means another thread finished the work, do not throw error to prevent reading the data from kafka again! + log.error("Caught optimistic locking exception in PaymentResponseKafkaListener for order id: {}", + paymentResponseAvroModel.getOrderId()); + } catch (OrderNotFoundException e) { + //NO-OP for OrderNotFoundException + log.error("No order found for order id: {}", paymentResponseAvroModel.getOrderId()); } }); diff --git a/order-service/order-messaging/src/main/java/com/food/order/system/messaging/mapper/OrderMessagingDataMapper.java b/order-service/order-messaging/src/main/java/com/food/order/system/messaging/mapper/OrderMessagingDataMapper.java index 6400fbb..97f5fde 100644 --- a/order-service/order-messaging/src/main/java/com/food/order/system/messaging/mapper/OrderMessagingDataMapper.java +++ b/order-service/order-messaging/src/main/java/com/food/order/system/messaging/mapper/OrderMessagingDataMapper.java @@ -1,11 +1,8 @@ package com.food.order.system.messaging.mapper; -import com.food.order.system.domain.event.OrderCancelledEvent; -import com.food.order.system.domain.event.OrderCreatedEvent; -import com.food.order.system.domain.event.OrderPaidEvent; -import com.food.order.system.kafka.order.avro.model.*; import com.food.order.system.dto.message.PaymentResponse; import com.food.order.system.dto.message.RestaurantApprovalResponse; +import com.food.order.system.kafka.order.avro.model.*; import com.food.order.system.outbox.model.approval.OrderApprovalEventPayload; import com.food.order.system.outbox.model.payment.OrderPaymentEventPayload; import com.food.order.system.valueobject.OrderApprovalStatus; @@ -17,54 +14,9 @@ import java.util.UUID; @Component public class OrderMessagingDataMapper { - public PaymentRequestAvroModel orderCreatedEventToPaymentRequestAvroModel(OrderCreatedEvent orderCreatedEvent) { - var order = orderCreatedEvent.getOrder(); - return PaymentRequestAvroModel.newBuilder() - .setId(UUID.randomUUID().toString()) - .setSagaId("") - .setCustomerId(order.getCustomerId().getValue().toString()) - .setOrderId(order.getId().getValue().toString()) - .setPrice(order.getPrice().getAmount()) - .setCreatedAt(orderCreatedEvent.getCreatedAt().toInstant()) - .setPaymentOrderStatus(PaymentOrderStatus.PENDING) - .build(); - } - - public PaymentRequestAvroModel orderCancelledEventToPaymentRequestAvroModel(OrderCancelledEvent orderCancelledEvent) { - var order = orderCancelledEvent.getOrder(); - return PaymentRequestAvroModel.newBuilder() - .setOrderId(order.getId().getValue().toString()) - .setSagaId("") - .setCustomerId(order.getCustomerId().getValue().toString()) - .setId(UUID.randomUUID().toString()) - .setPrice(order.getPrice().getAmount()) - .setCreatedAt(orderCancelledEvent.getCreatedAt().toInstant()) - .setPaymentOrderStatus(PaymentOrderStatus.CANCELLED) - .build(); - } - - - public RestaurantApprovalRequestAvroModel orderPaidEventToRestaurantApprovalRequestAvroModel(OrderPaidEvent event) { - var order = event.getOrder(); - return RestaurantApprovalRequestAvroModel.newBuilder() - .setOrderId(order.getId().getValue().toString()) - .setRestaurantId(order.getRestaurantId().getValue().toString()) - .setProducts(order.getItems().stream() - .map(item -> Product.newBuilder() - .setId(item.getProduct().getId().getValue().toString()) - .setQuantity(item.getQuantity()) - .build()) - .toList()) - .setId(UUID.randomUUID().toString()) - .setSagaId("") - .setPrice(order.getPrice().getAmount()) - .setCreatedAt(event.getCreatedAt().toInstant()) - .setRestaurantOrderStatus(RestaurantOrderStatus.PAID) - .build(); - } - public PaymentResponse paymentResponseAvroModelToPaymentResponse(PaymentResponseAvroModel message) { return PaymentResponse.builder() + .id(message.getId()) .orderId(message.getOrderId()) .sagaId(message.getSagaId()) .paymentId(message.getPaymentId()) diff --git a/pom.xml b/pom.xml index becb702..1a2cb8f 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ spring-boot-starter-parent org.springframework.boot - 2.7.1 + 2.6.7 4.0.0 diff --git a/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/mapper/RestaurantDataAccessMapper.java b/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/mapper/RestaurantDataAccessMapper.java index e65f1c3..37d8e53 100644 --- a/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/mapper/RestaurantDataAccessMapper.java +++ b/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/mapper/RestaurantDataAccessMapper.java @@ -36,7 +36,7 @@ public class RestaurantDataAccessMapper { .productId(new ProductId(entity.getProductId())) .name(entity.getProductName()) .price(new Money(entity.getProductPrice())) - .available(entity.getProductActive()) + .available(entity.getProductAvailable()) .build()) .toList();