2 Commits

Author SHA1 Message Date
Ali CANLI
fb9fb739f4 Outbox Message and Scheduler class bug fixed. 2022-07-17 12:51:17 +03:00
Ali CANLI
e1c2a8312b Outbox Message and Scheduler class bug fixed. 2022-07-17 12:33:35 +03:00
8 changed files with 53 additions and 97 deletions

View File

@@ -34,7 +34,7 @@ public class RestaurantEntity {
private BigDecimal productPrice;
private Boolean productActive;
private Boolean productAvailable;
@Override
public boolean equals(Object o) {

View File

@@ -13,7 +13,7 @@ services:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-broker-2:9092,LISTENER_LOCAL://localhost:29092
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-broker-1:9092,LISTENER_LOCAL://localhost:19092,PLAINTEXT://kafka-broker-2:9092,LISTENER_LOCAL://localhost:29092,PLAINTEXT://kafka-broker-3:9092,LISTENER_LOCAL://localhost:39092
SCHEMA_REGISTRY_DEBUG: 'true'
networks:
- ${GLOBAL_NETWORK:-kafka}

View File

@@ -1,18 +1,18 @@
package com.food.order.system.outbox.scheduler.approval;
import com.food.order.system.domain.exception.OrderDomainException;
import com.food.order.system.outbox.OutboxScheduler;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.outbox.model.approval.OrderApprovalOutboxMessage;
import com.food.order.system.saga.SagaStatus;
import com.food.order.system.ports.output.message.publisher.restaurantapproval.RestaurantApprovalRequestMessagePublisher;
import com.food.order.system.saga.SagaStatus;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.Objects;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
@@ -28,28 +28,20 @@ public class RestaurantApprovalOutboxScheduler implements OutboxScheduler {
@Scheduled(fixedDelayString = "${order-service.outbox-scheduler-fixed-rate}",
initialDelayString = "${order-service.outbox-scheduler-initial-delay}")
public void processOutboxMessage() {
log.info("Processing outbox message STARTED !");
var outboxMessageResponse =
Optional<List<OrderApprovalOutboxMessage>> outboxMessagesResponse =
approvalOutboxHelper.getApprovalOutboxMessageByOutboxStatusAndSagaStatus(
OutboxStatus.STARTED,
SagaStatus.STARTED,
SagaStatus.COMPENSATING)
.orElseThrow(
() -> new OrderDomainException("No outbox message found for processing"));
OutboxStatus.STARTED,
SagaStatus.PROCESSING);
if (outboxMessagesResponse.isPresent() && outboxMessagesResponse.get().size() > 0) {
List<OrderApprovalOutboxMessage> outboxMessages = outboxMessagesResponse.get();
log.info("Received {} OrderApprovalOutboxMessage with ids: {}, sending to message bus!",
outboxMessages.size(),
outboxMessages.stream().map(outboxMessage ->
outboxMessage.getId().toString()).collect(Collectors.joining(",")));
outboxMessages.forEach(outboxMessage ->
restaurantApprovalRequestMessagePublisher.publish(outboxMessage, this::updateOutboxStatus));
log.info("{} OrderApprovalOutboxMessage sent to message bus!", outboxMessages.size());
if (Objects.nonNull(outboxMessageResponse) && outboxMessageResponse.size() > 0) {
log.info("Received {} OrderPaymentOutboxMessage with ids : {} , sending message bus !" ,
outboxMessageResponse.size(),
outboxMessageResponse.stream().map(orderPaymentOutboxMessage -> orderPaymentOutboxMessage.getId().toString())
.collect(Collectors.joining(",")));
outboxMessageResponse.forEach(orderPaymentOutboxMessage -> {
restaurantApprovalRequestMessagePublisher.publish
(orderPaymentOutboxMessage,this::updateOutboxStatus);
});
log.info("Processing outbox message completed ! ");
}
}

View File

@@ -1,5 +1,6 @@
package com.food.order.system.messaging.listener.kafka;
import com.food.order.system.domain.exception.OrderNotFoundException;
import com.food.order.system.kafka.consumer.KafkaConsumer;
import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel;
import com.food.order.system.kafka.order.avro.model.PaymentStatus;
@@ -7,6 +8,7 @@ import com.food.order.system.messaging.mapper.OrderMessagingDataMapper;
import com.food.order.system.ports.input.message.listener.payment.PaymentResponseMessageListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
@@ -33,20 +35,31 @@ public class PaymentResponseKafkaListener implements KafkaConsumer<PaymentRespon
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offSets) {
log.info("{} number of payment responses received with keys : {} , partitions : {} , offsets : {}",
messages.size(), keys, partitions, offSets);
log.info("{} number of payment responses received with keys:{}, partitions:{} and offsets: {}",
messages.size(),
keys.toString(),
partitions.toString(),
offSets.toString());
messages.forEach(message -> {
if (PaymentStatus.COMPLETED.equals(message.getPaymentStatus())) {
log.info("Processing successful payment response for order id: {}", message.getOrderId());
paymentResponseMessageListener.paymentCompleted(orderMessagingDataMapper.
paymentResponseAvroModelToPaymentResponse(message));
}
else if (PaymentStatus.FAILED.equals(message.getPaymentStatus()) ||
PaymentStatus.CANCELLED.equals(message.getPaymentStatus())) {
log.info("Processing failed payment response for order id: {}", message.getOrderId());
paymentResponseMessageListener.paymentCancelled(orderMessagingDataMapper.
paymentResponseAvroModelToPaymentResponse(message));
messages.forEach(paymentResponseAvroModel -> {
try {
if (PaymentStatus.COMPLETED == paymentResponseAvroModel.getPaymentStatus()) {
log.info("Processing successful payment for order id: {}", paymentResponseAvroModel.getOrderId());
paymentResponseMessageListener.paymentCompleted(orderMessagingDataMapper
.paymentResponseAvroModelToPaymentResponse(paymentResponseAvroModel));
} else if (PaymentStatus.CANCELLED == paymentResponseAvroModel.getPaymentStatus() ||
PaymentStatus.FAILED == paymentResponseAvroModel.getPaymentStatus()) {
log.info("Processing unsuccessful payment for order id: {}", paymentResponseAvroModel.getOrderId());
paymentResponseMessageListener.paymentCancelled(orderMessagingDataMapper
.paymentResponseAvroModelToPaymentResponse(paymentResponseAvroModel));
}
} catch (OptimisticLockingFailureException e) {
//NO-OP for optimistic lock. This means another thread finished the work, do not throw error to prevent reading the data from kafka again!
log.error("Caught optimistic locking exception in PaymentResponseKafkaListener for order id: {}",
paymentResponseAvroModel.getOrderId());
} catch (OrderNotFoundException e) {
//NO-OP for OrderNotFoundException
log.error("No order found for order id: {}", paymentResponseAvroModel.getOrderId());
}
});

View File

@@ -1,11 +1,8 @@
package com.food.order.system.messaging.mapper;
import com.food.order.system.domain.event.OrderCancelledEvent;
import com.food.order.system.domain.event.OrderCreatedEvent;
import com.food.order.system.domain.event.OrderPaidEvent;
import com.food.order.system.kafka.order.avro.model.*;
import com.food.order.system.dto.message.PaymentResponse;
import com.food.order.system.dto.message.RestaurantApprovalResponse;
import com.food.order.system.kafka.order.avro.model.*;
import com.food.order.system.outbox.model.approval.OrderApprovalEventPayload;
import com.food.order.system.outbox.model.payment.OrderPaymentEventPayload;
import com.food.order.system.valueobject.OrderApprovalStatus;
@@ -17,54 +14,9 @@ import java.util.UUID;
@Component
public class OrderMessagingDataMapper {
public PaymentRequestAvroModel orderCreatedEventToPaymentRequestAvroModel(OrderCreatedEvent orderCreatedEvent) {
var order = orderCreatedEvent.getOrder();
return PaymentRequestAvroModel.newBuilder()
.setId(UUID.randomUUID().toString())
.setSagaId("")
.setCustomerId(order.getCustomerId().getValue().toString())
.setOrderId(order.getId().getValue().toString())
.setPrice(order.getPrice().getAmount())
.setCreatedAt(orderCreatedEvent.getCreatedAt().toInstant())
.setPaymentOrderStatus(PaymentOrderStatus.PENDING)
.build();
}
public PaymentRequestAvroModel orderCancelledEventToPaymentRequestAvroModel(OrderCancelledEvent orderCancelledEvent) {
var order = orderCancelledEvent.getOrder();
return PaymentRequestAvroModel.newBuilder()
.setOrderId(order.getId().getValue().toString())
.setSagaId("")
.setCustomerId(order.getCustomerId().getValue().toString())
.setId(UUID.randomUUID().toString())
.setPrice(order.getPrice().getAmount())
.setCreatedAt(orderCancelledEvent.getCreatedAt().toInstant())
.setPaymentOrderStatus(PaymentOrderStatus.CANCELLED)
.build();
}
public RestaurantApprovalRequestAvroModel orderPaidEventToRestaurantApprovalRequestAvroModel(OrderPaidEvent event) {
var order = event.getOrder();
return RestaurantApprovalRequestAvroModel.newBuilder()
.setOrderId(order.getId().getValue().toString())
.setRestaurantId(order.getRestaurantId().getValue().toString())
.setProducts(order.getItems().stream()
.map(item -> Product.newBuilder()
.setId(item.getProduct().getId().getValue().toString())
.setQuantity(item.getQuantity())
.build())
.toList())
.setId(UUID.randomUUID().toString())
.setSagaId("")
.setPrice(order.getPrice().getAmount())
.setCreatedAt(event.getCreatedAt().toInstant())
.setRestaurantOrderStatus(RestaurantOrderStatus.PAID)
.build();
}
public PaymentResponse paymentResponseAvroModelToPaymentResponse(PaymentResponseAvroModel message) {
return PaymentResponse.builder()
.id(message.getId())
.orderId(message.getOrderId())
.sagaId(message.getSagaId())
.paymentId(message.getPaymentId())

View File

@@ -9,7 +9,6 @@ import com.food.order.system.payment.service.domain.event.PaymentEvent;
import com.food.order.system.payment.service.domain.event.PaymentFailedEvent;
import com.food.order.system.payment.service.domain.valueobject.CreditHistoryId;
import com.food.order.system.payment.service.domain.valueobject.TransactionType;
import com.food.order.system.event.publisher.DomainEventPublisher;
import com.food.order.system.valueobject.Money;
import com.food.order.system.valueobject.PaymentStatus;
import lombok.extern.slf4j.Slf4j;
@@ -79,13 +78,13 @@ public class PaymentDomainServiceImpl implements PaymentDomainService {
var totalDebitHistory = getTotalHistoryAmount(creditHistory, TransactionType.DEBIT);
if (totalDebitHistory.isGreaterThan(totalCreditHistory)) {
failureMessages.add("Customer id " + creditEntry.getCustomerId() + " has insufficient credit");
log.error("Customer id {} has insufficient credit", creditEntry.getCustomerId());
failureMessages.add("Customer id " + creditEntry.getCustomerId().getValue() + " has insufficient credit");
log.error("Customer id {} has insufficient credit", creditEntry.getCustomerId().getValue());
}
if (!creditEntry.getTotalCreditAmount().equals(totalCreditHistory.subtract(totalDebitHistory))) {
failureMessages.add("Customer id " + creditEntry.getCustomerId() + " has total is not equal to credit history");
log.error("Customer id {} has total is not equal to credit history", creditEntry.getCustomerId());
failureMessages.add("Customer id " + creditEntry.getCustomerId().getValue() + " has total is not equal to credit history");
log.error("Customer id {} has total is not equal to credit history", creditEntry.getCustomerId().getValue());
}
}
@@ -115,8 +114,8 @@ public class PaymentDomainServiceImpl implements PaymentDomainService {
private void validateCreditEntry(Payment payment, CreditEntry creditEntry, List<String> failureMessages) {
if(payment.getPrice().isGreaterThan(creditEntry.getTotalCreditAmount())){
failureMessages.add("Customer id "+ payment.getCustomerId() + " , has insufficient credit amount" +
creditEntry.getTotalCreditAmount() + " to pay for order id " + payment.getOrderId());
failureMessages.add("Customer id "+ payment.getCustomerId().getValue() + " , has insufficient credit amount" +
creditEntry.getTotalCreditAmount().getAmount() + " to pay for order id " + payment.getOrderId().getValue());
log.error("Payment price is greater than credit");
}
}

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.7.1</version>
<version>2.6.7</version>
<relativePath/>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -36,7 +36,7 @@ public class RestaurantDataAccessMapper {
.productId(new ProductId(entity.getProductId()))
.name(entity.getProductName())
.price(new Money(entity.getProductPrice()))
.available(entity.getProductActive())
.available(entity.getProductAvailable())
.build())
.toList();