Outbox Message and Scheduler class bug fixed.

This commit is contained in:
Ali CANLI
2022-07-17 12:33:35 +03:00
parent 3626ed70de
commit e1c2a8312b
7 changed files with 47 additions and 90 deletions

View File

@@ -34,7 +34,7 @@ public class RestaurantEntity {
private BigDecimal productPrice;
private Boolean productActive;
private Boolean productAvailable;
@Override
public boolean equals(Object o) {

View File

@@ -13,7 +13,7 @@ services:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-broker-2:9092,LISTENER_LOCAL://localhost:29092
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-broker-1:9092,LISTENER_LOCAL://localhost:19092,PLAINTEXT://kafka-broker-2:9092,LISTENER_LOCAL://localhost:29092,PLAINTEXT://kafka-broker-3:9092,LISTENER_LOCAL://localhost:39092
SCHEMA_REGISTRY_DEBUG: 'true'
networks:
- ${GLOBAL_NETWORK:-kafka}

View File

@@ -1,18 +1,18 @@
package com.food.order.system.outbox.scheduler.approval;
import com.food.order.system.domain.exception.OrderDomainException;
import com.food.order.system.outbox.OutboxScheduler;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.outbox.model.approval.OrderApprovalOutboxMessage;
import com.food.order.system.saga.SagaStatus;
import com.food.order.system.ports.output.message.publisher.restaurantapproval.RestaurantApprovalRequestMessagePublisher;
import com.food.order.system.saga.SagaStatus;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.Objects;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
@@ -28,28 +28,20 @@ public class RestaurantApprovalOutboxScheduler implements OutboxScheduler {
@Scheduled(fixedDelayString = "${order-service.outbox-scheduler-fixed-rate}",
initialDelayString = "${order-service.outbox-scheduler-initial-delay}")
public void processOutboxMessage() {
log.info("Processing outbox message STARTED !");
var outboxMessageResponse =
Optional<List<OrderApprovalOutboxMessage>> outboxMessagesResponse =
approvalOutboxHelper.getApprovalOutboxMessageByOutboxStatusAndSagaStatus(
OutboxStatus.STARTED,
SagaStatus.STARTED,
SagaStatus.COMPENSATING)
.orElseThrow(
() -> new OrderDomainException("No outbox message found for processing"));
OutboxStatus.STARTED,
SagaStatus.PROCESSING);
if (outboxMessagesResponse.isPresent() && outboxMessagesResponse.get().size() > 0) {
List<OrderApprovalOutboxMessage> outboxMessages = outboxMessagesResponse.get();
log.info("Received {} OrderApprovalOutboxMessage with ids: {}, sending to message bus!",
outboxMessages.size(),
outboxMessages.stream().map(outboxMessage ->
outboxMessage.getId().toString()).collect(Collectors.joining(",")));
outboxMessages.forEach(outboxMessage ->
restaurantApprovalRequestMessagePublisher.publish(outboxMessage, this::updateOutboxStatus));
log.info("{} OrderApprovalOutboxMessage sent to message bus!", outboxMessages.size());
if (Objects.nonNull(outboxMessageResponse) && outboxMessageResponse.size() > 0) {
log.info("Received {} OrderPaymentOutboxMessage with ids : {} , sending message bus !" ,
outboxMessageResponse.size(),
outboxMessageResponse.stream().map(orderPaymentOutboxMessage -> orderPaymentOutboxMessage.getId().toString())
.collect(Collectors.joining(",")));
outboxMessageResponse.forEach(orderPaymentOutboxMessage -> {
restaurantApprovalRequestMessagePublisher.publish
(orderPaymentOutboxMessage,this::updateOutboxStatus);
});
log.info("Processing outbox message completed ! ");
}
}

View File

@@ -1,5 +1,6 @@
package com.food.order.system.messaging.listener.kafka;
import com.food.order.system.domain.exception.OrderNotFoundException;
import com.food.order.system.kafka.consumer.KafkaConsumer;
import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel;
import com.food.order.system.kafka.order.avro.model.PaymentStatus;
@@ -7,6 +8,7 @@ import com.food.order.system.messaging.mapper.OrderMessagingDataMapper;
import com.food.order.system.ports.input.message.listener.payment.PaymentResponseMessageListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
@@ -33,20 +35,31 @@ public class PaymentResponseKafkaListener implements KafkaConsumer<PaymentRespon
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offSets) {
log.info("{} number of payment responses received with keys : {} , partitions : {} , offsets : {}",
messages.size(), keys, partitions, offSets);
log.info("{} number of payment responses received with keys:{}, partitions:{} and offsets: {}",
messages.size(),
keys.toString(),
partitions.toString(),
offSets.toString());
messages.forEach(message -> {
if (PaymentStatus.COMPLETED.equals(message.getPaymentStatus())) {
log.info("Processing successful payment response for order id: {}", message.getOrderId());
paymentResponseMessageListener.paymentCompleted(orderMessagingDataMapper.
paymentResponseAvroModelToPaymentResponse(message));
}
else if (PaymentStatus.FAILED.equals(message.getPaymentStatus()) ||
PaymentStatus.CANCELLED.equals(message.getPaymentStatus())) {
log.info("Processing failed payment response for order id: {}", message.getOrderId());
paymentResponseMessageListener.paymentCancelled(orderMessagingDataMapper.
paymentResponseAvroModelToPaymentResponse(message));
messages.forEach(paymentResponseAvroModel -> {
try {
if (PaymentStatus.COMPLETED == paymentResponseAvroModel.getPaymentStatus()) {
log.info("Processing successful payment for order id: {}", paymentResponseAvroModel.getOrderId());
paymentResponseMessageListener.paymentCompleted(orderMessagingDataMapper
.paymentResponseAvroModelToPaymentResponse(paymentResponseAvroModel));
} else if (PaymentStatus.CANCELLED == paymentResponseAvroModel.getPaymentStatus() ||
PaymentStatus.FAILED == paymentResponseAvroModel.getPaymentStatus()) {
log.info("Processing unsuccessful payment for order id: {}", paymentResponseAvroModel.getOrderId());
paymentResponseMessageListener.paymentCancelled(orderMessagingDataMapper
.paymentResponseAvroModelToPaymentResponse(paymentResponseAvroModel));
}
} catch (OptimisticLockingFailureException e) {
//NO-OP for optimistic lock. This means another thread finished the work, do not throw error to prevent reading the data from kafka again!
log.error("Caught optimistic locking exception in PaymentResponseKafkaListener for order id: {}",
paymentResponseAvroModel.getOrderId());
} catch (OrderNotFoundException e) {
//NO-OP for OrderNotFoundException
log.error("No order found for order id: {}", paymentResponseAvroModel.getOrderId());
}
});

View File

@@ -1,11 +1,8 @@
package com.food.order.system.messaging.mapper;
import com.food.order.system.domain.event.OrderCancelledEvent;
import com.food.order.system.domain.event.OrderCreatedEvent;
import com.food.order.system.domain.event.OrderPaidEvent;
import com.food.order.system.kafka.order.avro.model.*;
import com.food.order.system.dto.message.PaymentResponse;
import com.food.order.system.dto.message.RestaurantApprovalResponse;
import com.food.order.system.kafka.order.avro.model.*;
import com.food.order.system.outbox.model.approval.OrderApprovalEventPayload;
import com.food.order.system.outbox.model.payment.OrderPaymentEventPayload;
import com.food.order.system.valueobject.OrderApprovalStatus;
@@ -17,54 +14,9 @@ import java.util.UUID;
@Component
public class OrderMessagingDataMapper {
public PaymentRequestAvroModel orderCreatedEventToPaymentRequestAvroModel(OrderCreatedEvent orderCreatedEvent) {
var order = orderCreatedEvent.getOrder();
return PaymentRequestAvroModel.newBuilder()
.setId(UUID.randomUUID().toString())
.setSagaId("")
.setCustomerId(order.getCustomerId().getValue().toString())
.setOrderId(order.getId().getValue().toString())
.setPrice(order.getPrice().getAmount())
.setCreatedAt(orderCreatedEvent.getCreatedAt().toInstant())
.setPaymentOrderStatus(PaymentOrderStatus.PENDING)
.build();
}
public PaymentRequestAvroModel orderCancelledEventToPaymentRequestAvroModel(OrderCancelledEvent orderCancelledEvent) {
var order = orderCancelledEvent.getOrder();
return PaymentRequestAvroModel.newBuilder()
.setOrderId(order.getId().getValue().toString())
.setSagaId("")
.setCustomerId(order.getCustomerId().getValue().toString())
.setId(UUID.randomUUID().toString())
.setPrice(order.getPrice().getAmount())
.setCreatedAt(orderCancelledEvent.getCreatedAt().toInstant())
.setPaymentOrderStatus(PaymentOrderStatus.CANCELLED)
.build();
}
public RestaurantApprovalRequestAvroModel orderPaidEventToRestaurantApprovalRequestAvroModel(OrderPaidEvent event) {
var order = event.getOrder();
return RestaurantApprovalRequestAvroModel.newBuilder()
.setOrderId(order.getId().getValue().toString())
.setRestaurantId(order.getRestaurantId().getValue().toString())
.setProducts(order.getItems().stream()
.map(item -> Product.newBuilder()
.setId(item.getProduct().getId().getValue().toString())
.setQuantity(item.getQuantity())
.build())
.toList())
.setId(UUID.randomUUID().toString())
.setSagaId("")
.setPrice(order.getPrice().getAmount())
.setCreatedAt(event.getCreatedAt().toInstant())
.setRestaurantOrderStatus(RestaurantOrderStatus.PAID)
.build();
}
public PaymentResponse paymentResponseAvroModelToPaymentResponse(PaymentResponseAvroModel message) {
return PaymentResponse.builder()
.id(message.getId())
.orderId(message.getOrderId())
.sagaId(message.getSagaId())
.paymentId(message.getPaymentId())

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.7.1</version>
<version>2.6.7</version>
<relativePath/>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -36,7 +36,7 @@ public class RestaurantDataAccessMapper {
.productId(new ProductId(entity.getProductId()))
.name(entity.getProductName())
.price(new Money(entity.getProductPrice()))
.available(entity.getProductActive())
.available(entity.getProductAvailable())
.build())
.toList();