Outbox Message and Scheduler class implemented part - 4.
This commit is contained in:
@@ -36,6 +36,11 @@
|
||||
<artifactId>order-messaging</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
|
||||
@@ -0,0 +1,121 @@
|
||||
package com.food.order.system;
|
||||
|
||||
|
||||
import com.food.order.system.data.access.outbox.payment.entity.PaymentOutboxEntity;
|
||||
import com.food.order.system.data.access.outbox.payment.repository.PaymentOutboxJpaRepository;
|
||||
import com.food.order.system.dto.message.PaymentResponse;
|
||||
import com.food.order.system.saga.OrderPaymentSaga;
|
||||
import com.food.order.system.saga.SagaStatus;
|
||||
import com.food.order.system.valueobject.PaymentStatus;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.dao.OptimisticLockingFailureException;
|
||||
import org.springframework.test.context.jdbc.Sql;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static com.food.order.system.outbox.order.SagaConst.ORDER_PROCESSING_SAGA;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.springframework.test.context.jdbc.Sql.ExecutionPhase.AFTER_TEST_METHOD;
|
||||
|
||||
@Slf4j
|
||||
@SpringBootTest(classes = OrderServiceApplication.class)
|
||||
@Sql(value = {"classpath:sql/OrderPaymentSagaTestSetUp.sql"})
|
||||
@Sql(value = {"classpath:sql/OrderPaymentSagaTestCleanUp.sql"}, executionPhase = AFTER_TEST_METHOD)
|
||||
class OrderPaymentSagaTest {
|
||||
|
||||
@Autowired
|
||||
private OrderPaymentSaga orderPaymentSaga;
|
||||
|
||||
@Autowired
|
||||
private PaymentOutboxJpaRepository paymentOutboxJpaRepository;
|
||||
|
||||
private final UUID SAGA_ID = UUID.fromString("15a497c1-0f4b-4eff-b9f4-c402c8c07afa");
|
||||
private final UUID ORDER_ID = UUID.fromString("d215b5f8-0249-4dc5-89a3-51fd148cfb17");
|
||||
private final UUID CUSTOMER_ID = UUID.fromString("d215b5f8-0249-4dc5-89a3-51fd148cfb41");
|
||||
private final UUID PAYMENT_ID = UUID.randomUUID();
|
||||
private final BigDecimal PRICE = new BigDecimal("100");
|
||||
|
||||
@Test
|
||||
void testDoublePayment() {
|
||||
orderPaymentSaga.process(getPaymentResponse());
|
||||
orderPaymentSaga.process(getPaymentResponse());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDoublePaymentWithThreads() throws InterruptedException {
|
||||
Thread thread1 = new Thread(() -> orderPaymentSaga.process(getPaymentResponse()));
|
||||
Thread thread2 = new Thread(() -> orderPaymentSaga.process(getPaymentResponse()));
|
||||
|
||||
thread1.start();
|
||||
thread2.start();
|
||||
|
||||
thread1.join();
|
||||
thread2.join();
|
||||
|
||||
assertPaymentOutbox();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDoublePaymentWithLatch() throws InterruptedException {
|
||||
CountDownLatch latch = new CountDownLatch(2);
|
||||
|
||||
Thread thread1 = new Thread(() -> {
|
||||
try {
|
||||
orderPaymentSaga.process(getPaymentResponse());
|
||||
} catch (OptimisticLockingFailureException e) {
|
||||
log.error("OptimisticLockingFailureException occurred for thread1");
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Thread thread2 = new Thread(() -> {
|
||||
try {
|
||||
orderPaymentSaga.process(getPaymentResponse());
|
||||
} catch (OptimisticLockingFailureException e) {
|
||||
log.error("OptimisticLockingFailureException occurred for thread2");
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
thread1.start();
|
||||
thread2.start();
|
||||
|
||||
latch.await();
|
||||
|
||||
assertPaymentOutbox();
|
||||
|
||||
}
|
||||
|
||||
private void assertPaymentOutbox() {
|
||||
Optional<PaymentOutboxEntity> paymentOutboxEntity =
|
||||
paymentOutboxJpaRepository.findByTypeAndSagaIdAndSagaStatusIn(ORDER_PROCESSING_SAGA, SAGA_ID,
|
||||
List.of(SagaStatus.PROCESSING));
|
||||
assertTrue(paymentOutboxEntity.isPresent());
|
||||
}
|
||||
|
||||
private PaymentResponse getPaymentResponse() {
|
||||
return PaymentResponse.builder()
|
||||
.id(UUID.randomUUID().toString())
|
||||
.sagaId(SAGA_ID.toString())
|
||||
.paymentStatus(PaymentStatus.COMPLETED)
|
||||
.paymentId(PAYMENT_ID.toString())
|
||||
.orderId(ORDER_ID.toString())
|
||||
.customerId(CUSTOMER_ID.toString())
|
||||
.price(PRICE)
|
||||
.createdAt(Instant.now())
|
||||
.failureMessages(new ArrayList<>())
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
delete from "order".orders where id = 'd215b5f8-0249-4dc5-89a3-51fd148cfb17';
|
||||
|
||||
delete from "order".payment_outbox where id = '8904808e-286f-449b-9b56-b63ba8351cf2';
|
||||
@@ -0,0 +1,15 @@
|
||||
insert into "order".orders(id, customer_id, restaurant_id, tracking_id, price, order_status, failure_messages)
|
||||
values('d215b5f8-0249-4dc5-89a3-51fd148cfb17', 'd215b5f8-0249-4dc5-89a3-51fd148cfb41', 'd215b5f8-0249-4dc5-89a3-51fd148cfb45',
|
||||
'd215b5f8-0249-4dc5-89a3-51fd148cfb18', 100.00, 'PENDING', '');
|
||||
|
||||
insert into "order".order_items(id, order_id, product_id, price, quantity, sub_total)
|
||||
values(1, 'd215b5f8-0249-4dc5-89a3-51fd148cfb17', 'd215b5f8-0249-4dc5-89a3-51fd148cfb47', 100.00, 1, 100.00);
|
||||
|
||||
insert into "order".order_address(id, order_id, street, postal_code, city)
|
||||
values('d215b5f8-0249-4dc5-89a3-51fd148cfb15', 'd215b5f8-0249-4dc5-89a3-51fd148cfb17', 'test street', '1000AA', 'test city');
|
||||
|
||||
insert into "order".payment_outbox(id, saga_id, created_at, type, payload, outbox_status, saga_status, order_status, version)
|
||||
values ('8904808e-286f-449b-9b56-b63ba8351cf2', '15a497c1-0f4b-4eff-b9f4-c402c8c07afa', current_timestamp, 'OrderProcessingSaga',
|
||||
'{"price": 100, "orderId": "ef471dac-ec22-43a7-a3f4-9d04195567a5", "createdAt": "2022-01-07T16:21:42.917756+01:00",
|
||||
"customerId": "d215b5f8-0249-4dc5-89a3-51fd148cfb41", "paymentOrderStatus": "PENDING"}',
|
||||
'STARTED', 'STARTED', 'PENDING', 0);
|
||||
@@ -1,16 +1,16 @@
|
||||
package com.food.order.system.data.access.order.adapter;
|
||||
|
||||
import com.food.order.system.ports.output.repository.OrderRepository;
|
||||
import com.food.order.system.data.access.order.mapper.OrderDataAccessMapper;
|
||||
import com.food.order.system.data.access.order.repository.OrderJpaRepository;
|
||||
import com.food.order.system.domain.entity.Order;
|
||||
import com.food.order.system.domain.valueobject.TrackingId;
|
||||
import com.food.order.system.ports.output.repository.OrderRepository;
|
||||
import com.food.order.system.valueobject.OrderId;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@@ -28,8 +28,8 @@ public class OrderRepositoryImpl implements OrderRepository {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Order> findById(String orderId) {
|
||||
return orderJpaRepository.findById(UUID.fromString(orderId))
|
||||
public Optional<Order> findById(OrderId orderId) {
|
||||
return orderJpaRepository.findById(orderId.getValue())
|
||||
.map(orderDataAccessMapper::orderEntityToOrder);
|
||||
}
|
||||
|
||||
|
||||
@@ -4,11 +4,14 @@ import com.food.order.system.domain.entity.Order;
|
||||
import com.food.order.system.domain.exception.OrderNotFoundException;
|
||||
import com.food.order.system.ports.output.repository.OrderRepository;
|
||||
import com.food.order.system.saga.SagaStatus;
|
||||
import com.food.order.system.valueobject.OrderId;
|
||||
import com.food.order.system.valueobject.OrderStatus;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@@ -17,7 +20,7 @@ public class OrderSagaHelper {
|
||||
private final OrderRepository orderRepository;
|
||||
|
||||
public Order findOrder(String orderId) {
|
||||
return orderRepository.findById(orderId)
|
||||
return orderRepository.findById(new OrderId(UUID.fromString(orderId)))
|
||||
.orElseThrow(() -> new OrderNotFoundException("Order not found -> Order id :" + orderId));
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.food.order.system.ports.output.repository;
|
||||
|
||||
import com.food.order.system.domain.entity.Order;
|
||||
import com.food.order.system.domain.valueobject.TrackingId;
|
||||
import com.food.order.system.valueobject.OrderId;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@@ -9,7 +10,7 @@ public interface OrderRepository {
|
||||
|
||||
Order save(Order order);
|
||||
|
||||
Optional<Order> findById(String trackingId);
|
||||
Optional<Order> findById(OrderId trackingId);
|
||||
|
||||
Optional<Order> findByTrackingId(TrackingId trackingId);
|
||||
|
||||
|
||||
@@ -4,13 +4,14 @@ import com.food.order.system.domain.entity.Order;
|
||||
import com.food.order.system.domain.event.OrderCancelledEvent;
|
||||
import com.food.order.system.domain.exception.OrderDomainException;
|
||||
import com.food.order.system.domain.service.OrderDomainService;
|
||||
import com.food.order.system.dto.message.RestaurantApprovalResponse;
|
||||
import com.food.order.system.helper.OrderSagaHelper;
|
||||
import com.food.order.system.mapper.OrderDataMapper;
|
||||
import com.food.order.system.outbox.OutboxStatus;
|
||||
import com.food.order.system.outbox.model.approval.OrderApprovalOutboxMessage;
|
||||
import com.food.order.system.outbox.model.payment.OrderPaymentOutboxMessage;
|
||||
import com.food.order.system.outbox.scheduler.approval.ApprovalOutboxHelper;
|
||||
import com.food.order.system.outbox.scheduler.payment.PaymentOutboxHelper;
|
||||
import com.food.order.system.dto.message.RestaurantApprovalResponse;
|
||||
import com.food.order.system.valueobject.OrderStatus;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -19,6 +20,7 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
import static com.food.order.system.DomainConstants.UTC;
|
||||
@@ -35,107 +37,96 @@ public class OrderApprovalSaga implements SagaStep<RestaurantApprovalResponse> {
|
||||
private final ApprovalOutboxHelper approvalOutboxHelper;
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void process(RestaurantApprovalResponse data) {
|
||||
|
||||
var messageResponse =
|
||||
public void process(RestaurantApprovalResponse restaurantApprovalResponse) {
|
||||
var orderApprovalOutboxMessageResponse =
|
||||
approvalOutboxHelper.getApprovalOutboxMessageBySagaIdAndSagaStatus(
|
||||
UUID.fromString(data.getSagaId()),
|
||||
SagaStatus.PROCESSING)
|
||||
.orElseThrow(() -> {
|
||||
log.error("Approval outbox message not found for saga id: {}", data.getSagaId());
|
||||
return new OrderDomainException("Approval outbox message not found for saga id: " + data.getSagaId());
|
||||
});
|
||||
UUID.fromString(restaurantApprovalResponse.getSagaId()),
|
||||
SagaStatus.PROCESSING).orElseThrow(
|
||||
() -> new OrderDomainException("OrderApprovalSaga: Order approval outbox message not found"));
|
||||
|
||||
var order = approveOrder(data);
|
||||
var order = approveOrder(restaurantApprovalResponse);
|
||||
|
||||
var sagaStatus = orderSagaHelper.orderStatusToSagaStatus(order.getStatus());
|
||||
|
||||
approvalOutboxHelper.save(getUpdatedApprovalOutboxMessage(messageResponse,order.getStatus(), sagaStatus));
|
||||
|
||||
approvalOutboxHelper.save(getUpdatedPaymentOutboxMessage(messageResponse.getSagaId(),
|
||||
approvalOutboxHelper.save(getUpdatedApprovalOutboxMessage(orderApprovalOutboxMessageResponse,
|
||||
order.getStatus(), sagaStatus));
|
||||
|
||||
paymentOutboxHelper.save(getUpdatedPaymentOutboxMessage(restaurantApprovalResponse.getSagaId(),
|
||||
order.getStatus(), sagaStatus));
|
||||
|
||||
|
||||
log.info("Order approved: {}", order);
|
||||
log.info("Order with id: {} is approved", order.getId().getValue());
|
||||
}
|
||||
|
||||
private OrderApprovalOutboxMessage getUpdatedPaymentOutboxMessage(UUID sagaId,
|
||||
OrderStatus status,
|
||||
SagaStatus sagaStatus) {
|
||||
var message = approvalOutboxHelper.getApprovalOutboxMessageBySagaIdAndSagaStatus(
|
||||
sagaId,
|
||||
SagaStatus.PROCESSING)
|
||||
.orElseThrow(() -> {
|
||||
log.error("Approval outbox message not found for saga id: {}", sagaId);
|
||||
return new OrderDomainException("Approval outbox message not found for saga id: " + sagaId);
|
||||
});
|
||||
@Override
|
||||
@Transactional
|
||||
public void rollback(RestaurantApprovalResponse restaurantApprovalResponse) {
|
||||
var orderApprovalOutboxMessageResponse =
|
||||
approvalOutboxHelper.getApprovalOutboxMessageBySagaIdAndSagaStatus(
|
||||
UUID.fromString(restaurantApprovalResponse.getSagaId()),
|
||||
SagaStatus.PROCESSING).orElseThrow(
|
||||
() -> new OrderDomainException("OrderApprovalSaga: Order approval outbox message not found"));
|
||||
|
||||
message.setProcessedAt(ZonedDateTime.now(ZoneId.of(UTC)));
|
||||
message.setSagaStatus(sagaStatus);
|
||||
message.setOrderStatus(status);
|
||||
return message;
|
||||
var domainEvent = rollbackOrder(restaurantApprovalResponse);
|
||||
|
||||
var sagaStatus = orderSagaHelper.orderStatusToSagaStatus(domainEvent.getOrder().getStatus());
|
||||
|
||||
approvalOutboxHelper.save(getUpdatedApprovalOutboxMessage(orderApprovalOutboxMessageResponse,
|
||||
domainEvent.getOrder().getStatus(), sagaStatus));
|
||||
|
||||
paymentOutboxHelper.savePaymentOutboxMessage(orderDataMapper
|
||||
.orderCancelledEventToOrderPaymentEventPayload(domainEvent),
|
||||
domainEvent.getOrder().getStatus(),
|
||||
sagaStatus,
|
||||
OutboxStatus.STARTED,
|
||||
UUID.fromString(restaurantApprovalResponse.getSagaId()));
|
||||
|
||||
log.info("Order with id: {} is cancelling", domainEvent.getOrder().getId().getValue());
|
||||
}
|
||||
|
||||
private OrderApprovalOutboxMessage getUpdatedApprovalOutboxMessage(OrderApprovalOutboxMessage messageResponse,
|
||||
OrderStatus status,
|
||||
SagaStatus sagaStatus) {
|
||||
messageResponse.setProcessedAt(ZonedDateTime.now(ZoneId.of(UTC)));
|
||||
messageResponse.setSagaStatus(sagaStatus);
|
||||
messageResponse.setOrderStatus(status);
|
||||
return messageResponse;
|
||||
}
|
||||
|
||||
|
||||
private Order approveOrder(RestaurantApprovalResponse data) {
|
||||
var order = orderSagaHelper.findOrder(data.getOrderId());
|
||||
private Order approveOrder(RestaurantApprovalResponse restaurantApprovalResponse) {
|
||||
log.info("Approving order with id: {}", restaurantApprovalResponse.getOrderId());
|
||||
Order order = orderSagaHelper.findOrder(restaurantApprovalResponse.getOrderId());
|
||||
orderDomainService.approve(order);
|
||||
orderSagaHelper.saveOrder(order);
|
||||
return order;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void rollback(RestaurantApprovalResponse data) {
|
||||
|
||||
var message = approvalOutboxHelper.getApprovalOutboxMessageBySagaIdAndSagaStatus(
|
||||
UUID.fromString(data.getSagaId()),
|
||||
SagaStatus.PROCESSING)
|
||||
.orElseThrow(
|
||||
() -> {
|
||||
log.error("Approval outbox message not found for saga id: {}", data.getSagaId());
|
||||
return new OrderDomainException("Approval outbox message not found for saga id: " +
|
||||
data.getSagaId());
|
||||
}
|
||||
);
|
||||
|
||||
var event = rollbackOrder(data);
|
||||
|
||||
var sagaStatus = orderSagaHelper.orderStatusToSagaStatus(event.getOrder().getStatus());
|
||||
|
||||
approvalOutboxHelper.save(getUpdatedApprovalOutboxMessage(message, event.getOrder().getStatus(), sagaStatus));
|
||||
|
||||
paymentOutboxHelper.savePaymentOutboxMessage(
|
||||
orderDataMapper.orderCancelledEventToOrderPaymentEventPayload(event),
|
||||
event.getOrder().getStatus(),
|
||||
sagaStatus,
|
||||
OutboxStatus.STARTED,
|
||||
message.getSagaId()
|
||||
);
|
||||
|
||||
log.info("Order cancelled event id: {}", event.getOrder().getId());
|
||||
private OrderApprovalOutboxMessage getUpdatedApprovalOutboxMessage(OrderApprovalOutboxMessage
|
||||
orderApprovalOutboxMessage,
|
||||
OrderStatus
|
||||
orderStatus,
|
||||
SagaStatus
|
||||
sagaStatus) {
|
||||
orderApprovalOutboxMessage.setProcessedAt(ZonedDateTime.now(ZoneId.of(UTC)));
|
||||
orderApprovalOutboxMessage.setOrderStatus(orderStatus);
|
||||
orderApprovalOutboxMessage.setSagaStatus(sagaStatus);
|
||||
return orderApprovalOutboxMessage;
|
||||
}
|
||||
|
||||
private OrderCancelledEvent rollbackOrder(RestaurantApprovalResponse data) {
|
||||
var order = orderSagaHelper.findOrder(data.getOrderId());
|
||||
var event = orderDomainService.cancelOrderPayment(
|
||||
order,
|
||||
data.getFailureMessages());
|
||||
private OrderPaymentOutboxMessage getUpdatedPaymentOutboxMessage(String sagaId,
|
||||
OrderStatus orderStatus,
|
||||
SagaStatus sagaStatus) {
|
||||
Optional<OrderPaymentOutboxMessage> orderPaymentOutboxMessageResponse = paymentOutboxHelper
|
||||
.getPaymentOutboxMessageBySagaIdAndSagaStatus(UUID.fromString(sagaId), SagaStatus.PROCESSING);
|
||||
if (orderPaymentOutboxMessageResponse.isEmpty()) {
|
||||
throw new OrderDomainException("Payment outbox message cannot be found in " +
|
||||
SagaStatus.PROCESSING.name() + " state");
|
||||
}
|
||||
OrderPaymentOutboxMessage orderPaymentOutboxMessage = orderPaymentOutboxMessageResponse.get();
|
||||
orderPaymentOutboxMessage.setProcessedAt(ZonedDateTime.now(ZoneId.of(UTC)));
|
||||
orderPaymentOutboxMessage.setOrderStatus(orderStatus);
|
||||
orderPaymentOutboxMessage.setSagaStatus(sagaStatus);
|
||||
return orderPaymentOutboxMessage;
|
||||
}
|
||||
|
||||
private OrderCancelledEvent rollbackOrder(RestaurantApprovalResponse restaurantApprovalResponse) {
|
||||
log.info("Cancelling order with id: {}", restaurantApprovalResponse.getOrderId());
|
||||
Order order = orderSagaHelper.findOrder(restaurantApprovalResponse.getOrderId());
|
||||
OrderCancelledEvent domainEvent = orderDomainService.cancelOrderPayment(order,
|
||||
restaurantApprovalResponse.getFailureMessages());
|
||||
orderSagaHelper.saveOrder(order);
|
||||
return event;
|
||||
return domainEvent;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,15 +3,18 @@ package com.food.order.system.saga;
|
||||
import com.food.order.system.domain.entity.Order;
|
||||
import com.food.order.system.domain.event.OrderPaidEvent;
|
||||
import com.food.order.system.domain.exception.OrderDomainException;
|
||||
import com.food.order.system.domain.exception.OrderNotFoundException;
|
||||
import com.food.order.system.domain.service.OrderDomainService;
|
||||
import com.food.order.system.dto.message.PaymentResponse;
|
||||
import com.food.order.system.helper.OrderSagaHelper;
|
||||
import com.food.order.system.mapper.OrderDataMapper;
|
||||
import com.food.order.system.outbox.OutboxStatus;
|
||||
import com.food.order.system.outbox.model.approval.OrderApprovalOutboxMessage;
|
||||
import com.food.order.system.outbox.model.payment.OrderPaymentOutboxMessage;
|
||||
import com.food.order.system.helper.OrderSagaHelper;
|
||||
import com.food.order.system.mapper.OrderDataMapper;
|
||||
import com.food.order.system.outbox.scheduler.approval.ApprovalOutboxHelper;
|
||||
import com.food.order.system.outbox.scheduler.payment.PaymentOutboxHelper;
|
||||
import com.food.order.system.ports.output.repository.OrderRepository;
|
||||
import com.food.order.system.valueobject.OrderId;
|
||||
import com.food.order.system.valueobject.OrderStatus;
|
||||
import com.food.order.system.valueobject.PaymentStatus;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
@@ -21,6 +24,7 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
import static com.food.order.system.DomainConstants.UTC;
|
||||
@@ -29,92 +33,125 @@ import static com.food.order.system.DomainConstants.UTC;
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class OrderPaymentSaga implements SagaStep<PaymentResponse> {
|
||||
|
||||
private final OrderDomainService orderDomainService;
|
||||
private final OrderRepository orderRepository;
|
||||
private final PaymentOutboxHelper paymentOutboxHelper;
|
||||
private final ApprovalOutboxHelper approvalOutboxHelper;
|
||||
private final OrderSagaHelper orderSagaHelper;
|
||||
private final OrderDataMapper orderDataMapper;
|
||||
private final ApprovalOutboxHelper approvalOutboxHelper;
|
||||
private final PaymentOutboxHelper paymentOutboxHelper;
|
||||
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void process(PaymentResponse data) {
|
||||
|
||||
var messageResponse =
|
||||
public void process(PaymentResponse paymentResponse) {
|
||||
Optional<OrderPaymentOutboxMessage> orderPaymentOutboxMessageResponse =
|
||||
paymentOutboxHelper.getPaymentOutboxMessageBySagaIdAndSagaStatus(
|
||||
UUID.fromString(data.getSagaId()),
|
||||
SagaStatus.STARTED)
|
||||
.orElseThrow(() -> {
|
||||
log.error("Payment outbox message not found for saga id: {}", data.getSagaId());
|
||||
return new OrderDomainException("Payment outbox message not found for saga id: " + data.getSagaId());
|
||||
});
|
||||
UUID.fromString(paymentResponse.getSagaId()),
|
||||
SagaStatus.STARTED);
|
||||
|
||||
|
||||
var paidEvent = completePaymentForOrder(data);
|
||||
|
||||
var sagaStatus = orderSagaHelper.orderStatusToSagaStatus(paidEvent.getOrder().getStatus());
|
||||
|
||||
paymentOutboxHelper.save(getUpdatedPaymentOutboxMessage(messageResponse,
|
||||
paidEvent.getOrder().getStatus(),
|
||||
sagaStatus));
|
||||
|
||||
approvalOutboxHelper.saveApprovalOutboxMessage(
|
||||
orderDataMapper.orderPaidEventToOrderApprovalEventPayload(paidEvent),
|
||||
paidEvent.getOrder().getStatus(),
|
||||
sagaStatus,
|
||||
OutboxStatus.STARTED,
|
||||
messageResponse.getSagaId()
|
||||
);
|
||||
|
||||
log.info("Payment completed for order with id: {}", paidEvent.getOrder().getId().getValue());
|
||||
}
|
||||
|
||||
private OrderPaymentOutboxMessage getUpdatedPaymentOutboxMessage(OrderPaymentOutboxMessage messageResponse,
|
||||
OrderStatus status,
|
||||
SagaStatus sagaStatus) {
|
||||
messageResponse.setProcessedAt(ZonedDateTime.now(ZoneId.of(UTC)));
|
||||
messageResponse.setOrderStatus(status);
|
||||
messageResponse.setSagaStatus(sagaStatus);
|
||||
return messageResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void rollback(PaymentResponse data) {
|
||||
|
||||
var messageResponse =
|
||||
paymentOutboxHelper.getPaymentOutboxMessageBySagaIdAndSagaStatus(
|
||||
UUID.fromString(data.getSagaId()),
|
||||
getCurrentSagaStatus(data.getPaymentStatus()))
|
||||
.orElseThrow(
|
||||
() -> {
|
||||
log.error("Payment outbox message not found for saga id: {}", data.getSagaId());
|
||||
return new OrderDomainException("Payment outbox message not found for saga id: " + data.getSagaId());
|
||||
}
|
||||
);
|
||||
|
||||
var orderRollback = rollbackPaymentForOrder(data);
|
||||
|
||||
var sagaStatus = orderSagaHelper.orderStatusToSagaStatus(orderRollback.getStatus());
|
||||
|
||||
paymentOutboxHelper.save(getUpdatedPaymentOutboxMessage(messageResponse,
|
||||
orderRollback.getStatus(),
|
||||
sagaStatus));
|
||||
|
||||
if (data.getPaymentStatus().equals(PaymentStatus.CANCELED)) {
|
||||
approvalOutboxHelper.save(getUpdatedApprovalOutboxMessage(data.getSagaId(),
|
||||
orderRollback.getStatus(),
|
||||
sagaStatus));
|
||||
if (orderPaymentOutboxMessageResponse.isEmpty()) {
|
||||
log.info("An outbox message with saga id: {} is already processed!", paymentResponse.getSagaId());
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Payment rolled back for order with id: {}", orderRollback.getId());
|
||||
OrderPaymentOutboxMessage orderPaymentOutboxMessage = orderPaymentOutboxMessageResponse.get();
|
||||
|
||||
OrderPaidEvent domainEvent = completePaymentForOrder(paymentResponse);
|
||||
|
||||
SagaStatus sagaStatus = orderSagaHelper.orderStatusToSagaStatus(domainEvent.getOrder().getStatus());
|
||||
|
||||
paymentOutboxHelper.save(getUpdatedPaymentOutboxMessage(orderPaymentOutboxMessage,
|
||||
domainEvent.getOrder().getStatus(), sagaStatus));
|
||||
|
||||
approvalOutboxHelper
|
||||
.saveApprovalOutboxMessage(orderDataMapper.orderPaidEventToOrderApprovalEventPayload(domainEvent),
|
||||
domainEvent.getOrder().getStatus(),
|
||||
sagaStatus,
|
||||
OutboxStatus.STARTED,
|
||||
UUID.fromString(paymentResponse.getSagaId()));
|
||||
|
||||
log.info("Order with id: {} is paid", domainEvent.getOrder().getId().getValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void rollback(PaymentResponse paymentResponse) {
|
||||
|
||||
Optional<OrderPaymentOutboxMessage> orderPaymentOutboxMessageResponse =
|
||||
paymentOutboxHelper.getPaymentOutboxMessageBySagaIdAndSagaStatus(
|
||||
UUID.fromString(paymentResponse.getSagaId()),
|
||||
getCurrentSagaStatus(paymentResponse.getPaymentStatus()));
|
||||
|
||||
if (orderPaymentOutboxMessageResponse.isEmpty()) {
|
||||
log.info("An outbox message with saga id: {} is already roll backed!", paymentResponse.getSagaId());
|
||||
return;
|
||||
}
|
||||
|
||||
OrderPaymentOutboxMessage orderPaymentOutboxMessage = orderPaymentOutboxMessageResponse.get();
|
||||
|
||||
Order order = rollbackPaymentForOrder(paymentResponse);
|
||||
|
||||
SagaStatus sagaStatus = orderSagaHelper.orderStatusToSagaStatus(order.getStatus());
|
||||
|
||||
paymentOutboxHelper.save(getUpdatedPaymentOutboxMessage(orderPaymentOutboxMessage,
|
||||
order.getStatus(), sagaStatus));
|
||||
|
||||
if (paymentResponse.getPaymentStatus() == PaymentStatus.CANCELED) {
|
||||
approvalOutboxHelper.save(getUpdatedApprovalOutboxMessage(paymentResponse.getSagaId(),
|
||||
order.getStatus(), sagaStatus));
|
||||
}
|
||||
|
||||
log.info("Order with id: {} is cancelled", order.getId().getValue());
|
||||
}
|
||||
|
||||
private Order findOrder(String orderId) {
|
||||
Optional<Order> orderResponse = orderRepository.findById(new OrderId(UUID.fromString(orderId)));
|
||||
if (orderResponse.isEmpty()) {
|
||||
log.error("Order with id: {} could not be found!", orderId);
|
||||
throw new OrderNotFoundException("Order with id " + orderId + " could not be found!");
|
||||
}
|
||||
return orderResponse.get();
|
||||
}
|
||||
|
||||
private OrderPaymentOutboxMessage getUpdatedPaymentOutboxMessage(OrderPaymentOutboxMessage
|
||||
orderPaymentOutboxMessage,
|
||||
OrderStatus
|
||||
orderStatus,
|
||||
SagaStatus
|
||||
sagaStatus) {
|
||||
orderPaymentOutboxMessage.setProcessedAt(ZonedDateTime.now(ZoneId.of(UTC)));
|
||||
orderPaymentOutboxMessage.setOrderStatus(orderStatus);
|
||||
orderPaymentOutboxMessage.setSagaStatus(sagaStatus);
|
||||
return orderPaymentOutboxMessage;
|
||||
}
|
||||
|
||||
private OrderPaidEvent completePaymentForOrder(PaymentResponse paymentResponse) {
|
||||
log.info("Completing payment for order with id: {}", paymentResponse.getOrderId());
|
||||
Order order = findOrder(paymentResponse.getOrderId());
|
||||
OrderPaidEvent domainEvent = orderDomainService.payOrder(order);
|
||||
orderRepository.save(order);
|
||||
return domainEvent;
|
||||
}
|
||||
|
||||
private SagaStatus[] getCurrentSagaStatus(PaymentStatus paymentStatus) {
|
||||
return switch (paymentStatus) {
|
||||
case COMPLETED -> new SagaStatus[] { SagaStatus.STARTED };
|
||||
case CANCELED -> new SagaStatus[] { SagaStatus.PROCESSING };
|
||||
case FAILED -> new SagaStatus[] { SagaStatus.STARTED, SagaStatus.PROCESSING };
|
||||
};
|
||||
}
|
||||
|
||||
private Order rollbackPaymentForOrder(PaymentResponse paymentResponse) {
|
||||
log.info("Cancelling order with id: {}", paymentResponse.getOrderId());
|
||||
Order order = findOrder(paymentResponse.getOrderId());
|
||||
orderDomainService.cancelOrder(order, paymentResponse.getFailureMessages());
|
||||
orderRepository.save(order);
|
||||
return order;
|
||||
}
|
||||
|
||||
private OrderApprovalOutboxMessage getUpdatedApprovalOutboxMessage(String sagaId,
|
||||
OrderStatus orderStatus,
|
||||
SagaStatus sagaStatus) {
|
||||
var orderApprovalOutboxMessageResponse =
|
||||
Optional<OrderApprovalOutboxMessage> orderApprovalOutboxMessageResponse =
|
||||
approvalOutboxHelper.getApprovalOutboxMessageBySagaIdAndSagaStatus(
|
||||
UUID.fromString(sagaId),
|
||||
SagaStatus.COMPENSATING);
|
||||
@@ -122,34 +159,10 @@ public class OrderPaymentSaga implements SagaStep<PaymentResponse> {
|
||||
throw new OrderDomainException("Approval outbox message could not be found in " +
|
||||
SagaStatus.COMPENSATING.name() + " status!");
|
||||
}
|
||||
var orderApprovalOutboxMessage = orderApprovalOutboxMessageResponse.get();
|
||||
OrderApprovalOutboxMessage orderApprovalOutboxMessage = orderApprovalOutboxMessageResponse.get();
|
||||
orderApprovalOutboxMessage.setProcessedAt(ZonedDateTime.now(ZoneId.of(UTC)));
|
||||
orderApprovalOutboxMessage.setOrderStatus(orderStatus);
|
||||
orderApprovalOutboxMessage.setSagaStatus(sagaStatus);
|
||||
return orderApprovalOutboxMessage;
|
||||
}
|
||||
|
||||
private Order rollbackPaymentForOrder(PaymentResponse paymentResponse) {
|
||||
log.info("Cancelling order with id: {}", paymentResponse.getOrderId());
|
||||
var order = orderSagaHelper.findOrder(paymentResponse.getOrderId());
|
||||
orderDomainService.cancelOrder(order, paymentResponse.getFailureMessages());
|
||||
orderSagaHelper.saveOrder(order);
|
||||
return order;
|
||||
}
|
||||
|
||||
private SagaStatus[] getCurrentSagaStatus(PaymentStatus paymentStatus) {
|
||||
return switch (paymentStatus) {
|
||||
case COMPLETED -> new SagaStatus[]{SagaStatus.STARTED};
|
||||
case CANCELED -> new SagaStatus[]{SagaStatus.PROCESSING};
|
||||
case FAILED -> new SagaStatus[]{SagaStatus.STARTED, SagaStatus.PROCESSING};
|
||||
};
|
||||
}
|
||||
|
||||
private OrderPaidEvent completePaymentForOrder(PaymentResponse data) {
|
||||
var order = orderSagaHelper.findOrder(data.getOrderId());
|
||||
var paidEvent = orderDomainService.payOrder(order);
|
||||
orderSagaHelper.saveOrder(order);
|
||||
log.info("Payment completed for order with id: {}", order.getId());
|
||||
return paidEvent;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,7 +176,7 @@ class OrderApplicationServiceTest {
|
||||
void testCreateOrderWithWrongTotalPrice() {
|
||||
OrderDomainException orderDomainException = assertThrows(OrderDomainException.class,
|
||||
() -> orderApplicationService.createOrder(createOrderCommandWrongPrice));
|
||||
assertEquals("Total price: 250.00 is not equal to Order items total: 200.00!",
|
||||
assertEquals("Order total price is not equal to the sum of order items prices",
|
||||
orderDomainException.getMessage());
|
||||
}
|
||||
|
||||
@@ -185,7 +185,7 @@ class OrderApplicationServiceTest {
|
||||
OrderDomainException orderDomainException = assertThrows(OrderDomainException.class,
|
||||
() -> orderApplicationService.createOrder(createOrderCommandWrongProductPrice));
|
||||
assertEquals(orderDomainException.getMessage(),
|
||||
"Order item price: 60.00 is not valid for product " + PRODUCT_ID);
|
||||
"Order item price is not valid");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -201,7 +201,7 @@ class OrderApplicationServiceTest {
|
||||
OrderDomainException orderDomainException = assertThrows(OrderDomainException.class,
|
||||
() -> orderApplicationService.createOrder(createOrderCommand));
|
||||
assertEquals(orderDomainException.getMessage(),
|
||||
"Restaurant with id " + RESTAURANT_ID + " is currently not active!");
|
||||
"Restaurant is not active, please try again later. Restaurant id: " + restaurantResponse.getId());
|
||||
}
|
||||
|
||||
private OrderPaymentOutboxMessage getOrderPaymentOutboxMessage() {
|
||||
|
||||
Reference in New Issue
Block a user