Outbox Message and Scheduler class implemented part - 2.

This commit is contained in:
Ali CANLI
2022-07-15 21:45:48 +03:00
parent d49b533f7e
commit 3affe7ccdd
25 changed files with 913 additions and 96 deletions

View File

@@ -18,9 +18,8 @@ public class PaymentResponseMessageListenerImpl implements PaymentResponseMessag
@Override
public void paymentCompleted(PaymentResponse paymentResponse) {
var paidEvent = orderPaymentSaga.process(paymentResponse);
log.info("Payment completed for order with id: {}", paidEvent.getOrder().getId());
paidEvent.fire();
orderPaymentSaga.process(paymentResponse);
log.info("Order Payment saga process completed id {}", paymentResponse.getId());
}
@Override

View File

@@ -22,8 +22,7 @@ public class RestaurantApprovalResponseMessageListenerImpl implements Restaurant
@Override
public void orderRejected(RestaurantApprovalResponse restaurantApprovalResponse) {
var event = orderApprovalSaga.rollback(restaurantApprovalResponse);
orderApprovalSaga.rollback(restaurantApprovalResponse);
log.info("Order Rejected: {}", restaurantApprovalResponse);
event.fire();
}
}

View File

@@ -5,11 +5,14 @@ import com.food.order.system.domain.entity.OrderItem;
import com.food.order.system.domain.entity.Product;
import com.food.order.system.domain.entity.Restaurant;
import com.food.order.system.domain.event.OrderCreatedEvent;
import com.food.order.system.domain.event.OrderPaidEvent;
import com.food.order.system.domain.valueobject.StreetAddress;
import com.food.order.sysyem.dto.create.CreateOrderCommand;
import com.food.order.sysyem.dto.create.CreateOrderResponse;
import com.food.order.sysyem.dto.create.OrderAddress;
import com.food.order.sysyem.dto.track.TrackOrderResponse;
import com.food.order.sysyem.outbox.model.approval.OrderApprovalEventPayload;
import com.food.order.sysyem.outbox.model.approval.OrderApprovalProduct;
import com.food.order.sysyem.outbox.model.payment.OrderPaymentEventPayload;
import com.food.order.sysyem.valueobject.*;
import org.springframework.stereotype.Component;
@@ -20,6 +23,22 @@ import java.util.UUID;
@Component
public class OrderDataMapper {
public OrderApprovalEventPayload orderPaidEventToOrderApprovalEventPayload(OrderPaidEvent orderPaidEvent) {
return OrderApprovalEventPayload.builder()
.orderId(orderPaidEvent.getOrder().getId().getValue().toString())
.restaurantId(orderPaidEvent.getOrder().getRestaurantId().getValue().toString())
.restaurantOrderStatus(RestaurantOrderStatus.PAID.name())
.products(orderPaidEvent.getOrder().getItems().stream()
.map(orderItem -> OrderApprovalProduct.builder()
.id(orderItem.getProduct().getId().getValue().toString())
.quantity(orderItem.getQuantity())
.build())
.toList())
.price(orderPaidEvent.getOrder().getPrice().getAmount())
.createdAt(orderPaidEvent.getCreatedAt())
.build();
}
public OrderPaymentEventPayload orderCreatedEventToOrderPaymentEventPayload(OrderCreatedEvent order) {
return OrderPaymentEventPayload.builder()
.orderId(order.getOrder().getId().getValue().toString())

View File

@@ -27,12 +27,16 @@ public class OrderPaymentOutboxMessage {
private String type;
private String payload;
@Setter
private SagaStatus sagaStatus;
@Setter
private OrderStatus orderStatus;
@Setter
private OutboxStatus outboxStatus;
private int version;

View File

@@ -1,10 +1,14 @@
package com.food.order.sysyem.outbox.scheduler.approval;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.food.order.system.domain.exception.OrderDomainException;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.saga.SagaStatus;
import com.food.order.sysyem.outbox.model.approval.OrderApprovalEventPayload;
import com.food.order.sysyem.outbox.model.approval.OrderApprovalOutboxMessage;
import com.food.order.sysyem.ports.output.repository.ApprovalOutboxRepository;
import com.food.order.sysyem.valueobject.OrderStatus;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -24,6 +28,8 @@ public class ApprovalOutboxHelper {
private final ApprovalOutboxRepository approvalOutboxRepository;
private final ObjectMapper objectMapper;
@Transactional(readOnly = true)
public Optional<List<OrderApprovalOutboxMessage>> getApprovalOutboxMessageByOutboxStatusAndSagaStatus
(OutboxStatus outboxStatus, SagaStatus... sagaStatus) {
@@ -63,4 +69,33 @@ public class ApprovalOutboxHelper {
sagaStatus);
}
@Transactional
public void saveApprovalOutboxMessage(OrderApprovalEventPayload payload,
OrderStatus orderStatus,
SagaStatus sagaStatus,
OutboxStatus outboxStatus,
UUID sagaId) {
save(OrderApprovalOutboxMessage.builder()
.id(UUID.randomUUID())
.type(ORDER_PROCESSING_SAGA)
.createdAt(payload.getCreatedAt())
.orderStatus(orderStatus)
.sagaStatus(sagaStatus)
.outboxStatus(outboxStatus)
.sagaId(sagaId)
.payload(createPayload(payload))
.build());
}
private String createPayload(OrderApprovalEventPayload payload) {
try {
return objectMapper.writeValueAsString(payload);
} catch (JsonProcessingException e) {
throw new OrderDomainException("Failed to create payload for JSON message");
}
}
}

View File

@@ -1,10 +1,8 @@
package com.food.order.sysyem.saga;
import com.food.order.system.domain.event.OrderCancelledEvent;
import com.food.order.system.domain.service.OrderDomainService;
import com.food.order.system.saga.SagaStep;
import com.food.order.sysyem.dto.message.RestaurantApprovalResponse;
import com.food.order.sysyem.event.EmptyEvent;
import com.food.order.sysyem.helper.OrderSagaHelper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -14,7 +12,7 @@ import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderApprovalSaga implements SagaStep<RestaurantApprovalResponse, EmptyEvent, OrderCancelledEvent> {
public class OrderApprovalSaga implements SagaStep<RestaurantApprovalResponse> {
private final OrderDomainService orderDomainService;
private final OrderSagaHelper orderSagaHelper;
@@ -23,27 +21,24 @@ public class OrderApprovalSaga implements SagaStep<RestaurantApprovalResponse, E
@Override
@Transactional
public EmptyEvent process(RestaurantApprovalResponse data) {
public void process(RestaurantApprovalResponse data) {
log.info("Approving order with id: {}", data.getOrderId());
var order = orderSagaHelper.findOrder(data.getOrderId());
orderDomainService.approve(order);
orderSagaHelper.saveOrder(order);
log.info("Order approved: {}", order);
return EmptyEvent.INSTANCE;
}
@Override
@Transactional
public OrderCancelledEvent rollback(RestaurantApprovalResponse data) {
public void rollback(RestaurantApprovalResponse data) {
log.info("Approving order with id: {}", data.getOrderId());
var order = orderSagaHelper.findOrder(data.getOrderId());
var cancelEvent = orderDomainService.cancelOrderPayment
(order,
data.getFailureMessages(),
messagePublisher);
data.getFailureMessages());
orderSagaHelper.saveOrder(order);
log.info("Order cancelled: {}", order);
return cancelEvent;
}
}

View File

@@ -1,44 +1,157 @@
package com.food.order.sysyem.saga;
import com.food.order.system.domain.entity.Order;
import com.food.order.system.domain.event.OrderPaidEvent;
import com.food.order.system.domain.exception.OrderDomainException;
import com.food.order.system.domain.service.OrderDomainService;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.saga.SagaStatus;
import com.food.order.system.saga.SagaStep;
import com.food.order.sysyem.dto.message.PaymentResponse;
import com.food.order.sysyem.event.EmptyEvent;
import com.food.order.sysyem.helper.OrderSagaHelper;
import com.food.order.sysyem.mapper.OrderDataMapper;
import com.food.order.sysyem.outbox.model.approval.OrderApprovalOutboxMessage;
import com.food.order.sysyem.outbox.model.payment.OrderPaymentOutboxMessage;
import com.food.order.sysyem.outbox.scheduler.approval.ApprovalOutboxHelper;
import com.food.order.sysyem.outbox.scheduler.payment.PaymentOutboxHelper;
import com.food.order.sysyem.valueobject.OrderStatus;
import com.food.order.sysyem.valueobject.PaymentStatus;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.UUID;
import static com.food.order.sysyem.DomainConstants.UTC;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderPaymentSaga implements SagaStep<PaymentResponse, OrderPaidEvent, EmptyEvent> {
public class OrderPaymentSaga implements SagaStep<PaymentResponse> {
private final OrderDomainService orderDomainService;
private final OrderSagaHelper orderSagaHelper;
private final OrderPaidRestaurantRequestMessagePublisher orderPaidRestaurantRequestMessagePublisher;
private final OrderDataMapper orderDataMapper;
private final ApprovalOutboxHelper approvalOutboxHelper;
private final PaymentOutboxHelper paymentOutboxHelper;
@Override
@Transactional
public OrderPaidEvent process(PaymentResponse data) {
log.info("Completing payment for order with id: {}", data.getOrderId());
public void process(PaymentResponse data) {
var messageResponse =
paymentOutboxHelper.getPaymentOutboxMessageBySagaIdAndSagaStatus(
UUID.fromString(data.getSagaId()),
SagaStatus.STARTED)
.orElseThrow(() -> {
log.error("Payment outbox message not found for saga id: {}", data.getSagaId());
return new OrderDomainException("Payment outbox message not found for saga id: " + data.getSagaId());
});
var paidEvent = completePaymentForOrder(data);
var sagaStatus = orderSagaHelper.orderStatusToSagaStatus(paidEvent.getOrder().getStatus());
paymentOutboxHelper.save(getUpdatedPaymentOutboxMessage(messageResponse,
paidEvent.getOrder().getStatus(),
sagaStatus));
approvalOutboxHelper.saveApprovalOutboxMessage(
orderDataMapper.orderPaidEventToOrderApprovalEventPayload(paidEvent),
paidEvent.getOrder().getStatus(),
sagaStatus,
OutboxStatus.STARTED,
messageResponse.getSagaId()
);
log.info("Payment completed for order with id: {}", paidEvent.getOrder().getId().getValue());
}
private OrderPaymentOutboxMessage getUpdatedPaymentOutboxMessage(OrderPaymentOutboxMessage messageResponse,
OrderStatus status,
SagaStatus sagaStatus) {
messageResponse.setProcessedAt(ZonedDateTime.now(ZoneId.of(UTC)));
messageResponse.setOrderStatus(status);
messageResponse.setSagaStatus(sagaStatus);
return messageResponse;
}
@Override
@Transactional
public void rollback(PaymentResponse data) {
var messageResponse =
paymentOutboxHelper.getPaymentOutboxMessageBySagaIdAndSagaStatus(
UUID.fromString(data.getSagaId()),
getCurrentSagaStatus(data.getPaymentStatus()))
.orElseThrow(
() -> {
log.error("Payment outbox message not found for saga id: {}", data.getSagaId());
return new OrderDomainException("Payment outbox message not found for saga id: " + data.getSagaId());
}
);
var orderRollback = rollbackPaymentForOrder(data);
var sagaStatus = orderSagaHelper.orderStatusToSagaStatus(orderRollback.getStatus());
paymentOutboxHelper.save(getUpdatedPaymentOutboxMessage(messageResponse,
orderRollback.getStatus(),
sagaStatus));
if (data.getPaymentStatus().equals(PaymentStatus.CANCELED)) {
approvalOutboxHelper.save(getUpdatedApprovalOutboxMessage(data.getSagaId(),
orderRollback.getStatus(),
sagaStatus));
}
log.info("Payment rolled back for order with id: {}", orderRollback.getId());
}
private OrderApprovalOutboxMessage getUpdatedApprovalOutboxMessage(String sagaId,
OrderStatus orderStatus,
SagaStatus sagaStatus) {
var orderApprovalOutboxMessageResponse =
approvalOutboxHelper.getApprovalOutboxMessageBySagaIdAndSagaStatus(
UUID.fromString(sagaId),
SagaStatus.COMPENSATING);
if (orderApprovalOutboxMessageResponse.isEmpty()) {
throw new OrderDomainException("Approval outbox message could not be found in " +
SagaStatus.COMPENSATING.name() + " status!");
}
var orderApprovalOutboxMessage = orderApprovalOutboxMessageResponse.get();
orderApprovalOutboxMessage.setProcessedAt(ZonedDateTime.now(ZoneId.of(UTC)));
orderApprovalOutboxMessage.setOrderStatus(orderStatus);
orderApprovalOutboxMessage.setSagaStatus(sagaStatus);
return orderApprovalOutboxMessage;
}
private Order rollbackPaymentForOrder(PaymentResponse paymentResponse) {
log.info("Cancelling order with id: {}", paymentResponse.getOrderId());
var order = orderSagaHelper.findOrder(paymentResponse.getOrderId());
orderDomainService.cancelOrder(order, paymentResponse.getFailureMessages());
orderSagaHelper.saveOrder(order);
return order;
}
private SagaStatus[] getCurrentSagaStatus(PaymentStatus paymentStatus) {
return switch (paymentStatus) {
case COMPLETED -> new SagaStatus[]{SagaStatus.STARTED};
case CANCELED -> new SagaStatus[]{SagaStatus.PROCESSING};
case FAILED -> new SagaStatus[]{SagaStatus.STARTED, SagaStatus.PROCESSING};
};
}
private OrderPaidEvent completePaymentForOrder(PaymentResponse data) {
var order = orderSagaHelper.findOrder(data.getOrderId());
var paidEvent = orderDomainService.payOrder(order,orderPaidRestaurantRequestMessagePublisher);
var paidEvent = orderDomainService.payOrder(order);
orderSagaHelper.saveOrder(order);
log.info("Payment completed for order with id: {}", order.getId());
return paidEvent;
}
@Override
@Transactional
public EmptyEvent rollback(PaymentResponse data) {
log.info("Rolling back payment for order with id: {}", data.getOrderId());
var order = orderSagaHelper.findOrder(data.getOrderId());
orderDomainService.cancelOrder(order,data.getFailureMessages());
orderSagaHelper.saveOrder(order);
log.info("Payment rolled back for order with id: {}", order.getId());
return EmptyEvent.INSTANCE;
}
}

View File

@@ -7,16 +7,8 @@ import java.time.ZonedDateTime;
public class OrderCancelledEvent extends OrderEvent {
private final DomainEventPublisher<OrderCancelledEvent> publisher;
public OrderCancelledEvent(Order order, ZonedDateTime utc, DomainEventPublisher<OrderCancelledEvent> publisher) {
public OrderCancelledEvent(Order order, ZonedDateTime utc) {
super(order, utc);
this.publisher = publisher;
}
@Override
public void fire() {
publisher.publish(this);
}
}

View File

@@ -1,20 +1,13 @@
package com.food.order.system.domain.event;
import com.food.order.system.domain.entity.Order;
import com.food.order.sysyem.event.publisher.DomainEventPublisher;
import java.time.ZonedDateTime;
public class OrderCreatedEvent extends OrderEvent {
private final DomainEventPublisher<OrderCreatedEvent> publisher;
public OrderCreatedEvent(Order order, ZonedDateTime createdAt, DomainEventPublisher<OrderCreatedEvent> publisher) {
public OrderCreatedEvent(Order order, ZonedDateTime createdAt) {
super(order, createdAt);
this.publisher = publisher;
}
@Override
public void fire() {
publisher.publish(this);
}
}

View File

@@ -7,15 +7,10 @@ import java.time.ZonedDateTime;
public class OrderPaidEvent extends OrderEvent {
private final DomainEventPublisher<OrderPaidEvent> publisher;
public OrderPaidEvent(Order order, ZonedDateTime utc, DomainEventPublisher<OrderPaidEvent> publisher) {
public OrderPaidEvent(Order order, ZonedDateTime utc) {
super(order, utc);
this.publisher = publisher;
}
@Override
public void fire() {
publisher.publish(this);
}
}

View File

@@ -5,19 +5,18 @@ import com.food.order.system.domain.entity.Restaurant;
import com.food.order.system.domain.event.OrderCancelledEvent;
import com.food.order.system.domain.event.OrderCreatedEvent;
import com.food.order.system.domain.event.OrderPaidEvent;
import com.food.order.sysyem.event.publisher.DomainEventPublisher;
import java.util.List;
public interface OrderDomainService {
OrderCreatedEvent validateAndInitiateOrder(Order order, Restaurant restaurant, DomainEventPublisher<OrderCreatedEvent> publisher);
OrderCreatedEvent validateAndInitiateOrder(Order order, Restaurant restaurant);
OrderPaidEvent payOrder(Order order,DomainEventPublisher<OrderPaidEvent> publisher);
OrderPaidEvent payOrder(Order order);
void approve(Order order);
OrderCancelledEvent cancelOrderPayment(Order order, List<String> failureMessages,DomainEventPublisher<OrderCancelledEvent> publisher);
OrderCancelledEvent cancelOrderPayment(Order order, List<String> failureMessages);
void cancelOrder(Order order, List<String> failureMessages);

View File

@@ -7,7 +7,6 @@ import com.food.order.system.domain.event.OrderCreatedEvent;
import com.food.order.system.domain.event.OrderPaidEvent;
import com.food.order.system.domain.exception.OrderDomainException;
import com.food.order.system.domain.service.OrderDomainService;
import com.food.order.sysyem.event.publisher.DomainEventPublisher;
import lombok.extern.slf4j.Slf4j;
import java.time.ZoneId;
@@ -20,14 +19,13 @@ public class OrderDomainServiceImpl implements OrderDomainService {
private static final String UTC = "UTC";
@Override
public OrderCreatedEvent validateAndInitiateOrder(Order order, Restaurant restaurant,
DomainEventPublisher<OrderCreatedEvent> publisher) {
public OrderCreatedEvent validateAndInitiateOrder(Order order, Restaurant restaurant) {
validateRestaurant(restaurant);
setOrderProductInformation(order,restaurant);
order.validateOrder();
order.initializeOrder();
log.info("Order with id {} initialize successfully", order.getId().getValue());
return new OrderCreatedEvent(order, ZonedDateTime.now(ZoneId.of(UTC)),publisher);
return new OrderCreatedEvent(order, ZonedDateTime.now(ZoneId.of(UTC)));
}
private void setOrderProductInformation(Order order, Restaurant restaurant) {
@@ -49,10 +47,10 @@ public class OrderDomainServiceImpl implements OrderDomainService {
}
@Override
public OrderPaidEvent payOrder(Order order, DomainEventPublisher<OrderPaidEvent> publisher) {
public OrderPaidEvent payOrder(Order order) {
order.pay();
log.info("Order with id {} paid successfully", order.getId().getValue());
return new OrderPaidEvent(order, ZonedDateTime.now(ZoneId.of(UTC)),publisher);
return new OrderPaidEvent(order, ZonedDateTime.now(ZoneId.of(UTC)));
}
@Override
@@ -62,11 +60,10 @@ public class OrderDomainServiceImpl implements OrderDomainService {
}
@Override
public OrderCancelledEvent cancelOrderPayment(Order order, List<String> failureMessages,
DomainEventPublisher<OrderCancelledEvent> publisher) {
public OrderCancelledEvent cancelOrderPayment(Order order, List<String> failureMessages) {
order.initCancel(failureMessages);
log.info("Order with id {} cancelled successfully", order.getId().getValue());
return new OrderCancelledEvent(order, ZonedDateTime.now(ZoneId.of(UTC)),publisher);
return new OrderCancelledEvent(order, ZonedDateTime.now(ZoneId.of(UTC)));
}
@Override