Outbox Message and Scheduler class implemented part - 1.
This commit is contained in:
@@ -12,5 +12,11 @@
|
||||
<artifactId>outbox</artifactId>
|
||||
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.food.order.system.outbox;
|
||||
|
||||
public interface OutboxScheduler {
|
||||
void processOutboxMessage();
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package com.food.order.system.outbox.config;
|
||||
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
@Configuration
|
||||
@EnableScheduling
|
||||
public class SchedulerConfig {
|
||||
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package com.food.order.system.outbox.order;
|
||||
|
||||
public class SagaConst {
|
||||
|
||||
private SagaConst() {
|
||||
}
|
||||
|
||||
public static final String ORDER_PROCESSING_SAGA = "OrderProcessingSaga";
|
||||
|
||||
|
||||
|
||||
}
|
||||
@@ -31,6 +31,16 @@
|
||||
<artifactId>spring-boot-starter-validation</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.food.order</groupId>
|
||||
<artifactId>outbox</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-json</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-tx</artifactId>
|
||||
|
||||
@@ -1,13 +1,18 @@
|
||||
package com.food.order.sysyem;
|
||||
|
||||
import com.food.order.system.outbox.OutboxStatus;
|
||||
import com.food.order.sysyem.dto.create.CreateOrderCommand;
|
||||
import com.food.order.sysyem.dto.create.CreateOrderResponse;
|
||||
import com.food.order.sysyem.helper.OrderCreateHelper;
|
||||
import com.food.order.sysyem.helper.OrderSagaHelper;
|
||||
import com.food.order.sysyem.mapper.OrderDataMapper;
|
||||
import com.food.order.sysyem.ports.output.message.publisher.payment.OrderCreatedPaymentRequestMessagePublisher;
|
||||
import com.food.order.sysyem.outbox.scheduler.payment.PaymentOutboxHelper;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@@ -17,14 +22,28 @@ public class OrderCreateCommandHandler {
|
||||
private final OrderCreateHelper orderCreateHelper;
|
||||
private final OrderDataMapper orderDataMapper;
|
||||
|
||||
private final OrderCreatedPaymentRequestMessagePublisher orderCreatedPaymentRequestMessagePublisher;
|
||||
private final OrderSagaHelper orderSagaHelper;
|
||||
private final PaymentOutboxHelper paymentOutboxHelper;
|
||||
|
||||
|
||||
|
||||
@Transactional
|
||||
public CreateOrderResponse createOrder(CreateOrderCommand createOrderCommand) {
|
||||
var persistOrder = orderCreateHelper.persistOrder(createOrderCommand);
|
||||
log.info("createOrder with id: {}", persistOrder.getOrder().getId().getValue());
|
||||
orderCreatedPaymentRequestMessagePublisher.publish(persistOrder);
|
||||
return orderDataMapper.orderToCreateOrderResponse(persistOrder.getOrder(),"Order created successfully");
|
||||
var response = orderDataMapper.orderToCreateOrderResponse(persistOrder.getOrder(),"Order created successfully");
|
||||
|
||||
paymentOutboxHelper.savePaymentOutboxMessage(
|
||||
orderDataMapper.orderCreatedEventToOrderPaymentEventPayload(persistOrder),
|
||||
persistOrder.getOrder().getStatus(),
|
||||
orderSagaHelper.orderStatusToSagaStatus(persistOrder.getOrder().getStatus()),
|
||||
OutboxStatus.STARTED,
|
||||
UUID.randomUUID()
|
||||
);
|
||||
|
||||
log.info("Returning CreateOrderResponse with order id : {}", persistOrder.getOrder().getId());
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -28,7 +28,6 @@ public class OrderCreateHelper {
|
||||
private final CustomerRepository customerRepository;
|
||||
private final RestaurantRepository restaurantRepository;
|
||||
private final OrderDataMapper orderDataMapper;
|
||||
private final DomainEventPublisher<OrderCreatedEvent> publisher;
|
||||
|
||||
@Transactional
|
||||
public OrderCreatedEvent persistOrder(CreateOrderCommand createOrderCommand) {
|
||||
@@ -36,7 +35,8 @@ public class OrderCreateHelper {
|
||||
checkCustomer(createOrderCommand.customerId());
|
||||
Restaurant restaurant = checkRestaurant(createOrderCommand);
|
||||
var order = orderDataMapper.createOrderCommandToOrder(createOrderCommand);
|
||||
var createdEventOrder = orderDomainService.validateAndInitiateOrder(order, restaurant,publisher);
|
||||
var createdEventOrder = orderDomainService.validateAndInitiateOrder
|
||||
(order, restaurant);
|
||||
saveOrder(order);
|
||||
log.info("Created Order Event : {}", createdEventOrder);
|
||||
return createdEventOrder;
|
||||
|
||||
@@ -2,7 +2,9 @@ package com.food.order.sysyem.helper;
|
||||
|
||||
import com.food.order.system.domain.entity.Order;
|
||||
import com.food.order.system.domain.exception.OrderNotFoundException;
|
||||
import com.food.order.system.saga.SagaStatus;
|
||||
import com.food.order.sysyem.ports.output.repository.OrderRepository;
|
||||
import com.food.order.sysyem.valueobject.OrderStatus;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
@@ -23,5 +25,15 @@ public class OrderSagaHelper {
|
||||
orderRepository.save(order);
|
||||
}
|
||||
|
||||
public SagaStatus orderStatusToSagaStatus(OrderStatus orderStatus) {
|
||||
return switch (orderStatus) {
|
||||
case PAID -> SagaStatus.PROCESSING;
|
||||
case APPROVED -> SagaStatus.SUCCEEDED;
|
||||
case CANCELLING -> SagaStatus.COMPENSATING;
|
||||
case CANCELLED -> SagaStatus.COMPENSATED;
|
||||
default -> SagaStatus.STARTED;
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -4,15 +4,14 @@ import com.food.order.system.domain.entity.Order;
|
||||
import com.food.order.system.domain.entity.OrderItem;
|
||||
import com.food.order.system.domain.entity.Product;
|
||||
import com.food.order.system.domain.entity.Restaurant;
|
||||
import com.food.order.system.domain.event.OrderCreatedEvent;
|
||||
import com.food.order.system.domain.valueobject.StreetAddress;
|
||||
import com.food.order.sysyem.dto.create.CreateOrderCommand;
|
||||
import com.food.order.sysyem.dto.create.CreateOrderResponse;
|
||||
import com.food.order.sysyem.dto.create.OrderAddress;
|
||||
import com.food.order.sysyem.dto.track.TrackOrderResponse;
|
||||
import com.food.order.sysyem.valueobject.CustomerId;
|
||||
import com.food.order.sysyem.valueobject.Money;
|
||||
import com.food.order.sysyem.valueobject.ProductId;
|
||||
import com.food.order.sysyem.valueobject.RestaurantId;
|
||||
import com.food.order.sysyem.outbox.model.payment.OrderPaymentEventPayload;
|
||||
import com.food.order.sysyem.valueobject.*;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
@@ -21,6 +20,16 @@ import java.util.UUID;
|
||||
@Component
|
||||
public class OrderDataMapper {
|
||||
|
||||
public OrderPaymentEventPayload orderCreatedEventToOrderPaymentEventPayload(OrderCreatedEvent order) {
|
||||
return OrderPaymentEventPayload.builder()
|
||||
.orderId(order.getOrder().getId().getValue().toString())
|
||||
.customerId(order.getOrder().getCustomerId().getValue().toString())
|
||||
.price(order.getOrder().getPrice().getAmount())
|
||||
.createdAt(order.getCreatedAt())
|
||||
.paymentOrderStatus(PaymentOrderStatus.PENDING.name())
|
||||
.build();
|
||||
}
|
||||
|
||||
public TrackOrderResponse orderToTrackOrderResponse(Order order) {
|
||||
|
||||
return TrackOrderResponse.builder()
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
package com.food.order.sysyem.outbox.model.approval;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
|
||||
@Getter
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
public class OrderApprovalEventPayload {
|
||||
|
||||
@JsonProperty
|
||||
private String orderId;
|
||||
|
||||
@JsonProperty
|
||||
private String restaurantId;
|
||||
|
||||
@JsonProperty
|
||||
private BigDecimal price;
|
||||
|
||||
@JsonProperty
|
||||
private ZonedDateTime createdAt;
|
||||
|
||||
@JsonProperty
|
||||
private String restaurantOrderStatus;
|
||||
|
||||
@JsonProperty
|
||||
private List<OrderApprovalProduct> products;
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
package com.food.order.sysyem.outbox.model.approval;
|
||||
|
||||
import com.food.order.system.outbox.OutboxStatus;
|
||||
import com.food.order.system.saga.SagaStatus;
|
||||
import com.food.order.sysyem.valueobject.OrderStatus;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.UUID;
|
||||
|
||||
@Getter
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
public class OrderApprovalOutboxMessage {
|
||||
|
||||
private UUID id;
|
||||
|
||||
private UUID sagaId;
|
||||
|
||||
private ZonedDateTime createdAt;
|
||||
|
||||
@Setter
|
||||
private ZonedDateTime processedAt;
|
||||
|
||||
private String type;
|
||||
|
||||
private String payload;
|
||||
|
||||
@Setter
|
||||
private SagaStatus sagaStatus;
|
||||
|
||||
@Setter
|
||||
private OrderStatus orderStatus;
|
||||
|
||||
@Setter
|
||||
private OutboxStatus outboxStatus;
|
||||
|
||||
private int version;
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.food.order.sysyem.outbox.model.approval;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
|
||||
@Getter
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
public class OrderApprovalProduct {
|
||||
|
||||
@JsonProperty
|
||||
private String id;
|
||||
@JsonProperty
|
||||
private Integer quantity;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package com.food.order.sysyem.outbox.model.payment;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.time.ZonedDateTime;
|
||||
|
||||
@Getter
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
public class OrderPaymentEventPayload {
|
||||
|
||||
@JsonProperty
|
||||
private String orderId;
|
||||
@JsonProperty
|
||||
private String customerId;
|
||||
@JsonProperty
|
||||
private BigDecimal price;
|
||||
@JsonProperty
|
||||
private ZonedDateTime createdAt;
|
||||
@JsonProperty
|
||||
private String paymentOrderStatus;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package com.food.order.sysyem.outbox.model.payment;
|
||||
|
||||
import com.food.order.system.outbox.OutboxStatus;
|
||||
import com.food.order.system.saga.SagaStatus;
|
||||
import com.food.order.sysyem.valueobject.OrderStatus;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.UUID;
|
||||
|
||||
@Getter
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
public class OrderPaymentOutboxMessage {
|
||||
private UUID id;
|
||||
|
||||
private UUID sagaId;
|
||||
|
||||
private ZonedDateTime createdAt;
|
||||
|
||||
@Setter
|
||||
private ZonedDateTime processedAt;
|
||||
|
||||
private String type;
|
||||
|
||||
private String payload;
|
||||
@Setter
|
||||
private SagaStatus sagaStatus;
|
||||
@Setter
|
||||
private OrderStatus orderStatus;
|
||||
@Setter
|
||||
private OutboxStatus outboxStatus;
|
||||
private int version;
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
package com.food.order.sysyem.outbox.scheduler.approval;
|
||||
|
||||
import com.food.order.system.domain.exception.OrderDomainException;
|
||||
import com.food.order.system.outbox.OutboxStatus;
|
||||
import com.food.order.system.saga.SagaStatus;
|
||||
import com.food.order.sysyem.outbox.model.approval.OrderApprovalOutboxMessage;
|
||||
import com.food.order.sysyem.ports.output.repository.ApprovalOutboxRepository;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
import static com.food.order.system.outbox.order.SagaConst.ORDER_PROCESSING_SAGA;
|
||||
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
@Component
|
||||
public class ApprovalOutboxHelper {
|
||||
|
||||
private final ApprovalOutboxRepository approvalOutboxRepository;
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public Optional<List<OrderApprovalOutboxMessage>> getApprovalOutboxMessageByOutboxStatusAndSagaStatus
|
||||
(OutboxStatus outboxStatus, SagaStatus... sagaStatus) {
|
||||
return approvalOutboxRepository.findByTypeAndOutboxStatusAndSagaStatus(
|
||||
ORDER_PROCESSING_SAGA
|
||||
,outboxStatus,
|
||||
sagaStatus);
|
||||
}
|
||||
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public Optional<OrderApprovalOutboxMessage> getApprovalOutboxMessageBySagaIdAndSagaStatus(
|
||||
UUID sagaId, SagaStatus... sagaStatus) {
|
||||
return approvalOutboxRepository.findByTypeAndSagaIdAndSagaStatus(
|
||||
ORDER_PROCESSING_SAGA
|
||||
,sagaId,
|
||||
sagaStatus);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public void save(OrderApprovalOutboxMessage approvalOutboxMessage) {
|
||||
var response = approvalOutboxRepository.save(approvalOutboxMessage);
|
||||
if (Objects.isNull(response)) {
|
||||
throw new OrderDomainException("Failed to save outbox message id : " +
|
||||
approvalOutboxMessage.getId());
|
||||
}
|
||||
log.info("Outbox message id : {} saved successfully", response.getId());
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public void deleteApprovalOutboxMessageByOutboxStatusAndSagaStatus(OutboxStatus outboxStatus,
|
||||
SagaStatus... sagaStatus) {
|
||||
|
||||
approvalOutboxRepository.deleteByTypeAndOutboxStatusAndSagaStatus(
|
||||
ORDER_PROCESSING_SAGA,
|
||||
outboxStatus,
|
||||
sagaStatus);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package com.food.order.sysyem.outbox.scheduler.approval;
|
||||
|
||||
import com.food.order.system.domain.exception.OrderDomainException;
|
||||
import com.food.order.system.outbox.OutboxScheduler;
|
||||
import com.food.order.system.outbox.OutboxStatus;
|
||||
import com.food.order.system.saga.SagaStatus;
|
||||
import com.food.order.sysyem.outbox.model.approval.OrderApprovalOutboxMessage;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class RestaurantApprovalOutboxCleaner implements OutboxScheduler {
|
||||
|
||||
private final ApprovalOutboxHelper approvalOutboxHelper;
|
||||
|
||||
@Override
|
||||
@Scheduled(cron = "@midnight")
|
||||
public void processOutboxMessage() {
|
||||
|
||||
var outboxMessageResponse =
|
||||
approvalOutboxHelper.getApprovalOutboxMessageByOutboxStatusAndSagaStatus(
|
||||
OutboxStatus.COMPLETED,
|
||||
SagaStatus.SUCCEEDED,
|
||||
SagaStatus.FAILED,
|
||||
SagaStatus.COMPENSATING).
|
||||
orElseThrow(() -> new OrderDomainException("No outbox message found for processing"));
|
||||
|
||||
if (Objects.nonNull(outboxMessageResponse)) {
|
||||
|
||||
log.info("Received {} OrderPaymentOutboxMessage for clean-up. The Payloads :{}",
|
||||
outboxMessageResponse.size(),
|
||||
outboxMessageResponse.stream().map(OrderApprovalOutboxMessage::getPayload)
|
||||
.collect(Collectors.joining(",")));
|
||||
approvalOutboxHelper.deleteApprovalOutboxMessageByOutboxStatusAndSagaStatus(
|
||||
OutboxStatus.COMPLETED,
|
||||
SagaStatus.SUCCEEDED,
|
||||
SagaStatus.FAILED,
|
||||
SagaStatus.COMPENSATING);
|
||||
log.info("Clean-up completed ! DELETED LOG SIZE : {}", outboxMessageResponse.size());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,64 @@
|
||||
package com.food.order.sysyem.outbox.scheduler.approval;
|
||||
|
||||
import com.food.order.system.domain.exception.OrderDomainException;
|
||||
import com.food.order.system.outbox.OutboxScheduler;
|
||||
import com.food.order.system.outbox.OutboxStatus;
|
||||
import com.food.order.system.saga.SagaStatus;
|
||||
import com.food.order.sysyem.outbox.model.approval.OrderApprovalOutboxMessage;
|
||||
import com.food.order.sysyem.ports.output.message.publisher.restaurantapproval.RestaurantApprovalRequestMessagePublisher;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class RestaurantApprovalOutboxMessage implements OutboxScheduler {
|
||||
|
||||
private final ApprovalOutboxHelper approvalOutboxHelper;
|
||||
private final RestaurantApprovalRequestMessagePublisher restaurantApprovalRequestMessagePublisher;
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
@Scheduled(fixedDelayString = "${order-service.outbox-scheduler-fixed-rate}",
|
||||
initialDelayString = "${order-service.outbox-scheduler-initial-delay}")
|
||||
public void processOutboxMessage() {
|
||||
|
||||
log.info("Processing outbox message STARTED !");
|
||||
|
||||
var outboxMessageResponse =
|
||||
approvalOutboxHelper.getApprovalOutboxMessageByOutboxStatusAndSagaStatus(
|
||||
OutboxStatus.STARTED,
|
||||
SagaStatus.STARTED,
|
||||
SagaStatus.COMPENSATING)
|
||||
.orElseThrow(
|
||||
() -> new OrderDomainException("No outbox message found for processing"));
|
||||
|
||||
if (Objects.nonNull(outboxMessageResponse) && outboxMessageResponse.size() > 0) {
|
||||
|
||||
log.info("Received {} OrderPaymentOutboxMessage with ids : {} , sending message bus !" ,
|
||||
outboxMessageResponse.size(),
|
||||
outboxMessageResponse.stream().map(orderPaymentOutboxMessage -> orderPaymentOutboxMessage.getId().toString())
|
||||
.collect(Collectors.joining(",")));
|
||||
outboxMessageResponse.forEach(orderPaymentOutboxMessage -> {
|
||||
restaurantApprovalRequestMessagePublisher.publish
|
||||
(orderPaymentOutboxMessage,this::updateOutboxStatus);
|
||||
});
|
||||
log.info("Processing outbox message completed ! ");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private void updateOutboxStatus(OrderApprovalOutboxMessage orderPaymentOutboxMessage,
|
||||
OutboxStatus outboxStatus) {
|
||||
orderPaymentOutboxMessage.setOutboxStatus(outboxStatus);
|
||||
approvalOutboxHelper.save(orderPaymentOutboxMessage);
|
||||
log.info("Outbox message id : {} updated successfully", orderPaymentOutboxMessage.getId());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package com.food.order.sysyem.outbox.scheduler.payment;
|
||||
|
||||
import com.food.order.system.domain.exception.OrderDomainException;
|
||||
import com.food.order.system.outbox.OutboxScheduler;
|
||||
import com.food.order.system.outbox.OutboxStatus;
|
||||
import com.food.order.system.saga.SagaStatus;
|
||||
import com.food.order.sysyem.outbox.model.payment.OrderPaymentOutboxMessage;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class PaymentOutboxCleaner implements OutboxScheduler {
|
||||
|
||||
private final PaymentOutboxHelper paymentOutboxHelper;
|
||||
|
||||
@Override
|
||||
@Scheduled(cron = "@midnight")
|
||||
public void processOutboxMessage() {
|
||||
|
||||
var outboxMessageResponse =
|
||||
paymentOutboxHelper.getPaymentOutboxMessageByOutboxMessageStatusAndSagaStatus(
|
||||
OutboxStatus.COMPLETED,
|
||||
SagaStatus.SUCCEEDED,
|
||||
SagaStatus.FAILED,
|
||||
SagaStatus.COMPENSATING).
|
||||
orElseThrow(() -> new OrderDomainException("No outbox message found for processing"));
|
||||
|
||||
if (Objects.nonNull(outboxMessageResponse)) {
|
||||
|
||||
log.info("Received {} OrderPaymentOutboxMessage for clean-up. The Payloads :{}",
|
||||
outboxMessageResponse.size(),
|
||||
outboxMessageResponse.stream().map(OrderPaymentOutboxMessage::getPayload)
|
||||
.collect(Collectors.joining(",")));
|
||||
paymentOutboxHelper.deletePaymentOutboxMessageByOutboxStatusAndSagaStatus(
|
||||
OutboxStatus.COMPLETED,
|
||||
SagaStatus.SUCCEEDED,
|
||||
SagaStatus.FAILED,
|
||||
SagaStatus.COMPENSATING);
|
||||
log.info("Clean-up completed ! DELETED LOG SIZE : {}", outboxMessageResponse.size());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,100 @@
|
||||
package com.food.order.sysyem.outbox.scheduler.payment;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.food.order.system.domain.exception.OrderDomainException;
|
||||
import com.food.order.system.outbox.OutboxStatus;
|
||||
import com.food.order.system.saga.SagaStatus;
|
||||
import com.food.order.sysyem.outbox.model.payment.OrderPaymentEventPayload;
|
||||
import com.food.order.sysyem.outbox.model.payment.OrderPaymentOutboxMessage;
|
||||
import com.food.order.sysyem.ports.output.repository.PaymentOutboxRepository;
|
||||
import com.food.order.sysyem.valueobject.OrderStatus;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
import static com.food.order.system.outbox.order.SagaConst.ORDER_PROCESSING_SAGA;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class PaymentOutboxHelper {
|
||||
|
||||
private final PaymentOutboxRepository paymentOutboxRepository;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public Optional<List<OrderPaymentOutboxMessage>> getPaymentOutboxMessageByOutboxMessageStatusAndSagaStatus(
|
||||
OutboxStatus outboxStatus, SagaStatus... sagaStatus) {
|
||||
return paymentOutboxRepository.findByTypeAndOutboxStatusAndSagaStatus(
|
||||
ORDER_PROCESSING_SAGA,
|
||||
outboxStatus,
|
||||
sagaStatus);
|
||||
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
public Optional<OrderPaymentOutboxMessage> getPaymentOutboxMessageBySagaIdAndSagaStatus(
|
||||
UUID sagaId, SagaStatus... sagaStatus) {
|
||||
return paymentOutboxRepository.findByTypeAndSagaIdAndSagaStatus(
|
||||
ORDER_PROCESSING_SAGA
|
||||
,sagaId,
|
||||
sagaStatus);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public void save(OrderPaymentOutboxMessage orderPaymentOutboxMessage) {
|
||||
var response = paymentOutboxRepository.save(orderPaymentOutboxMessage);
|
||||
if (Objects.isNull(response)) {
|
||||
throw new OrderDomainException("Failed to save outbox message id : " +
|
||||
orderPaymentOutboxMessage.getId());
|
||||
}
|
||||
log.info("Outbox message id : {} saved successfully", response.getId());
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public void savePaymentOutboxMessage(OrderPaymentEventPayload payload,
|
||||
OrderStatus orderStatus,
|
||||
SagaStatus sagaStatus,
|
||||
OutboxStatus outboxStatus,
|
||||
UUID sagaId) {
|
||||
|
||||
save(OrderPaymentOutboxMessage.builder()
|
||||
.id(UUID.randomUUID())
|
||||
.sagaId(sagaId)
|
||||
.createdAt(payload.getCreatedAt())
|
||||
.type(ORDER_PROCESSING_SAGA)
|
||||
.payload(createPayload(payload))
|
||||
.outboxStatus(outboxStatus)
|
||||
.orderStatus(orderStatus)
|
||||
.sagaStatus(sagaStatus)
|
||||
.build());
|
||||
|
||||
}
|
||||
|
||||
private String createPayload(OrderPaymentEventPayload payload) {
|
||||
try {
|
||||
return objectMapper.writeValueAsString(payload);
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Failed to create payload for outbox message", e);
|
||||
throw new OrderDomainException("Failed to create payload for outbox message");
|
||||
}
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public void deletePaymentOutboxMessageByOutboxStatusAndSagaStatus(OutboxStatus outboxStatus,
|
||||
SagaStatus... sagaStatus) {
|
||||
|
||||
paymentOutboxRepository.deleteByTypeAndOutboxStatusAndSagaStatus(
|
||||
ORDER_PROCESSING_SAGA,
|
||||
outboxStatus,
|
||||
sagaStatus);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
package com.food.order.sysyem.outbox.scheduler.payment;
|
||||
|
||||
import com.food.order.system.domain.exception.OrderDomainException;
|
||||
import com.food.order.system.outbox.OutboxScheduler;
|
||||
import com.food.order.system.outbox.OutboxStatus;
|
||||
import com.food.order.system.saga.SagaStatus;
|
||||
import com.food.order.sysyem.outbox.model.payment.OrderPaymentOutboxMessage;
|
||||
import com.food.order.sysyem.ports.output.message.publisher.payment.PaymentRequestMessagePublisher;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class PaymentOutboxScheduler implements OutboxScheduler {
|
||||
|
||||
private final PaymentOutboxHelper paymentOutboxHelper;
|
||||
private final PaymentRequestMessagePublisher paymentRequestMessagePublisher;
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
@Scheduled(fixedDelayString = "${order-service.outbox-scheduler-fixed-rate}",
|
||||
initialDelayString = "${order-service.outbox-scheduler-initial-delay}")
|
||||
public void processOutboxMessage() {
|
||||
|
||||
log.info("Processing outbox message STARTED !");
|
||||
|
||||
var outboxMessageResponse =
|
||||
paymentOutboxHelper.getPaymentOutboxMessageByOutboxMessageStatusAndSagaStatus(
|
||||
OutboxStatus.STARTED,
|
||||
SagaStatus.STARTED,
|
||||
SagaStatus.COMPENSATING)
|
||||
.orElseThrow(
|
||||
() -> new OrderDomainException("No outbox message found for processing"));
|
||||
|
||||
if (Objects.nonNull(outboxMessageResponse) && outboxMessageResponse.size() > 0) {
|
||||
|
||||
log.info("Received {} OrderPaymentOutboxMessage with ids : {} , sending message bus !" ,
|
||||
outboxMessageResponse.size(),
|
||||
outboxMessageResponse.stream().map(orderPaymentOutboxMessage -> orderPaymentOutboxMessage.getId().toString())
|
||||
.collect(Collectors.joining(",")));
|
||||
outboxMessageResponse.forEach(orderPaymentOutboxMessage -> {
|
||||
paymentRequestMessagePublisher.publish(orderPaymentOutboxMessage,this::updateOutboxStatus);
|
||||
});
|
||||
log.info("Processing outbox message completed ! ");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private void updateOutboxStatus(OrderPaymentOutboxMessage orderPaymentOutboxMessage,
|
||||
OutboxStatus outboxStatus) {
|
||||
orderPaymentOutboxMessage.setOutboxStatus(outboxStatus);
|
||||
paymentOutboxHelper.save(orderPaymentOutboxMessage);
|
||||
log.info("Outbox message id : {} updated successfully", orderPaymentOutboxMessage.getId());
|
||||
}
|
||||
}
|
||||
@@ -1,8 +0,0 @@
|
||||
package com.food.order.sysyem.ports.output.message.publisher.payment;
|
||||
|
||||
import com.food.order.sysyem.event.publisher.DomainEventPublisher;
|
||||
import com.food.order.system.domain.event.OrderCancelledEvent;
|
||||
|
||||
public interface OrderCancelledPaymentRequestMessagePublisher extends DomainEventPublisher<OrderCancelledEvent> {
|
||||
|
||||
}
|
||||
@@ -1,8 +0,0 @@
|
||||
package com.food.order.sysyem.ports.output.message.publisher.payment;
|
||||
|
||||
import com.food.order.sysyem.event.publisher.DomainEventPublisher;
|
||||
import com.food.order.system.domain.event.OrderCreatedEvent;
|
||||
|
||||
public interface OrderCreatedPaymentRequestMessagePublisher extends DomainEventPublisher<OrderCreatedEvent> {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.food.order.sysyem.ports.output.message.publisher.payment;
|
||||
|
||||
import com.food.order.system.outbox.OutboxStatus;
|
||||
import com.food.order.sysyem.outbox.model.payment.OrderPaymentOutboxMessage;
|
||||
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
public interface PaymentRequestMessagePublisher {
|
||||
|
||||
void publish(OrderPaymentOutboxMessage message,
|
||||
BiConsumer<OrderPaymentOutboxMessage, OutboxStatus> outboxCallback);
|
||||
|
||||
}
|
||||
@@ -1,8 +0,0 @@
|
||||
package com.food.order.sysyem.ports.output.message.publisher.restaurantapproval;
|
||||
|
||||
import com.food.order.sysyem.event.publisher.DomainEventPublisher;
|
||||
import com.food.order.system.domain.event.OrderPaidEvent;
|
||||
|
||||
public interface OrderPaidRestaurantRequestMessagePublisher extends DomainEventPublisher<OrderPaidEvent> {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.food.order.sysyem.ports.output.message.publisher.restaurantapproval;
|
||||
|
||||
import com.food.order.system.outbox.OutboxStatus;
|
||||
import com.food.order.sysyem.outbox.model.approval.OrderApprovalOutboxMessage;
|
||||
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
public interface RestaurantApprovalRequestMessagePublisher {
|
||||
|
||||
void publish(OrderApprovalOutboxMessage message,
|
||||
BiConsumer<OrderApprovalOutboxMessage, OutboxStatus> outboxCallback);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
package com.food.order.sysyem.ports.output.repository;
|
||||
|
||||
import com.food.order.system.outbox.OutboxStatus;
|
||||
import com.food.order.system.saga.SagaStatus;
|
||||
import com.food.order.sysyem.outbox.model.approval.OrderApprovalOutboxMessage;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
public interface ApprovalOutboxRepository {
|
||||
OrderApprovalOutboxMessage save(OrderApprovalOutboxMessage message);
|
||||
|
||||
Optional<List<OrderApprovalOutboxMessage>> findByTypeAndOutboxStatusAndSagaStatus(String type,
|
||||
OutboxStatus outboxStatus,
|
||||
SagaStatus... sagaStatus);
|
||||
|
||||
Optional<OrderApprovalOutboxMessage> findByTypeAndSagaIdAndSagaStatus(String type,
|
||||
UUID sagaId,
|
||||
SagaStatus... sagaStatus);
|
||||
|
||||
void deleteByTypeAndOutboxStatusAndSagaStatus(String type,
|
||||
OutboxStatus outboxStatus,
|
||||
SagaStatus... sagaStatus);
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
package com.food.order.sysyem.ports.output.repository;
|
||||
|
||||
import com.food.order.system.outbox.OutboxStatus;
|
||||
import com.food.order.system.saga.SagaStatus;
|
||||
import com.food.order.sysyem.outbox.model.payment.OrderPaymentOutboxMessage;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
public interface PaymentOutboxRepository {
|
||||
|
||||
OrderPaymentOutboxMessage save(OrderPaymentOutboxMessage message);
|
||||
|
||||
Optional<List<OrderPaymentOutboxMessage>> findByTypeAndOutboxStatusAndSagaStatus( String type,
|
||||
OutboxStatus outboxStatus,
|
||||
SagaStatus... sagaStatus);
|
||||
|
||||
Optional<OrderPaymentOutboxMessage> findByTypeAndSagaIdAndSagaStatus(String type,
|
||||
UUID sagaId,
|
||||
SagaStatus... sagaStatus);
|
||||
|
||||
void deleteByTypeAndOutboxStatusAndSagaStatus(String type,
|
||||
OutboxStatus outboxStatus,
|
||||
SagaStatus... sagaStatus);
|
||||
|
||||
|
||||
}
|
||||
@@ -6,7 +6,6 @@ import com.food.order.system.saga.SagaStep;
|
||||
import com.food.order.sysyem.dto.message.RestaurantApprovalResponse;
|
||||
import com.food.order.sysyem.event.EmptyEvent;
|
||||
import com.food.order.sysyem.helper.OrderSagaHelper;
|
||||
import com.food.order.sysyem.ports.output.message.publisher.payment.OrderCancelledPaymentRequestMessagePublisher;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
@@ -18,9 +17,10 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
public class OrderApprovalSaga implements SagaStep<RestaurantApprovalResponse, EmptyEvent, OrderCancelledEvent> {
|
||||
|
||||
private final OrderDomainService orderDomainService;
|
||||
private final OrderCancelledPaymentRequestMessagePublisher messagePublisher;
|
||||
private final OrderSagaHelper orderSagaHelper;
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public EmptyEvent process(RestaurantApprovalResponse data) {
|
||||
@@ -38,7 +38,9 @@ public class OrderApprovalSaga implements SagaStep<RestaurantApprovalResponse, E
|
||||
public OrderCancelledEvent rollback(RestaurantApprovalResponse data) {
|
||||
log.info("Approving order with id: {}", data.getOrderId());
|
||||
var order = orderSagaHelper.findOrder(data.getOrderId());
|
||||
var cancelEvent = orderDomainService.cancelOrderPayment(order,data.getFailureMessages(),
|
||||
var cancelEvent = orderDomainService.cancelOrderPayment
|
||||
(order,
|
||||
data.getFailureMessages(),
|
||||
messagePublisher);
|
||||
orderSagaHelper.saveOrder(order);
|
||||
log.info("Order cancelled: {}", order);
|
||||
|
||||
@@ -6,7 +6,6 @@ import com.food.order.system.saga.SagaStep;
|
||||
import com.food.order.sysyem.dto.message.PaymentResponse;
|
||||
import com.food.order.sysyem.event.EmptyEvent;
|
||||
import com.food.order.sysyem.helper.OrderSagaHelper;
|
||||
import com.food.order.sysyem.ports.output.message.publisher.restaurantapproval.OrderPaidRestaurantRequestMessagePublisher;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@@ -6,7 +6,7 @@ import com.food.order.system.kafka.producer.KafkaMessageHelper;
|
||||
import com.food.order.system.kafka.producer.service.KafkaProducer;
|
||||
import com.food.order.system.order.messaging.mapper.OrderMessagingDataMapper;
|
||||
import com.food.order.sysyem.config.OrderServiceConfigData;
|
||||
import com.food.order.sysyem.ports.output.message.publisher.payment.OrderCancelledPaymentRequestMessagePublisher;
|
||||
import com.food.order.sysyem.event.publisher.DomainEventPublisher;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
@@ -14,7 +14,7 @@ import org.springframework.stereotype.Component;
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
@Component
|
||||
public class CancelOrderKafkaMessagePublisher implements OrderCancelledPaymentRequestMessagePublisher {
|
||||
public class CancelOrderKafkaMessagePublisher implements DomainEventPublisher<OrderCancelledEvent> {
|
||||
|
||||
private final OrderMessagingDataMapper orderMessagingDataMapper;
|
||||
private final OrderServiceConfigData configData;
|
||||
|
||||
@@ -7,7 +7,7 @@ import com.food.order.system.kafka.producer.KafkaMessageHelper;
|
||||
import com.food.order.system.kafka.producer.service.KafkaProducer;
|
||||
import com.food.order.system.order.messaging.mapper.OrderMessagingDataMapper;
|
||||
import com.food.order.sysyem.config.OrderServiceConfigData;
|
||||
import com.food.order.sysyem.ports.output.message.publisher.payment.OrderCreatedPaymentRequestMessagePublisher;
|
||||
import com.food.order.sysyem.event.publisher.DomainEventPublisher;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
@@ -15,7 +15,7 @@ import org.springframework.stereotype.Component;
|
||||
@Component
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class CreateOrderKafkaMessagePublisher implements OrderCreatedPaymentRequestMessagePublisher {
|
||||
public class CreateOrderKafkaMessagePublisher implements DomainEventPublisher<OrderCreatedEvent> {
|
||||
|
||||
private final OrderMessagingDataMapper orderMessagingDataMapper;
|
||||
private final OrderServiceConfigData configData;
|
||||
|
||||
@@ -6,7 +6,7 @@ import com.food.order.system.kafka.producer.KafkaMessageHelper;
|
||||
import com.food.order.system.kafka.producer.service.KafkaProducer;
|
||||
import com.food.order.system.order.messaging.mapper.OrderMessagingDataMapper;
|
||||
import com.food.order.sysyem.config.OrderServiceConfigData;
|
||||
import com.food.order.sysyem.ports.output.message.publisher.restaurantapproval.OrderPaidRestaurantRequestMessagePublisher;
|
||||
import com.food.order.sysyem.event.publisher.DomainEventPublisher;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
@@ -14,7 +14,7 @@ import org.springframework.stereotype.Component;
|
||||
@Component
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class PayOrderKafkaMessagePublisher implements OrderPaidRestaurantRequestMessagePublisher {
|
||||
public class PayOrderKafkaMessagePublisher implements DomainEventPublisher<OrderPaidEvent> {
|
||||
|
||||
private final OrderMessagingDataMapper orderMessagingDataMapper;
|
||||
private final OrderServiceConfigData configData;
|
||||
|
||||
Reference in New Issue
Block a user