diff --git a/restaurant-service/restaurant-container/src/main/resources/application.yml b/restaurant-service/restaurant-container/src/main/resources/application.yml index 8e36e2f..2d20c61 100644 --- a/restaurant-service/restaurant-container/src/main/resources/application.yml +++ b/restaurant-service/restaurant-container/src/main/resources/application.yml @@ -8,6 +8,8 @@ logging: restaurant-service: restaurant-approval-request-topic-name: restaurant-approval-request-value restaurant-approval-response-topic-name: restaurant-approval-response-value + outbox-scheduler-fixed-rate: 10000 + outbox-scheduler-initial-delay: 10000 spring: jpa: diff --git a/restaurant-service/restaurant-container/src/main/resources/init-data.sql b/restaurant-service/restaurant-container/src/main/resources/init-data.sql index 6311c69..5307e48 100644 --- a/restaurant-service/restaurant-container/src/main/resources/init-data.sql +++ b/restaurant-service/restaurant-container/src/main/resources/init-data.sql @@ -1,22 +1,22 @@ INSERT INTO restaurant.restaurants(id, name, active) - VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb45', 'restaurant_1', TRUE); +VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb45', 'restaurant_1', TRUE); INSERT INTO restaurant.restaurants(id, name, active) - VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb46', 'restaurant_2', FALSE); +VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb46', 'restaurant_2', FALSE); INSERT INTO restaurant.products(id, name, price, available) - VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb47', 'product_1', 25.00, FALSE); +VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb47', 'product_1', 25.00, FALSE); INSERT INTO restaurant.products(id, name, price, available) - VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb48', 'product_2', 50.00, TRUE); +VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb48', 'product_2', 50.00, TRUE); INSERT INTO restaurant.products(id, name, price, available) - VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb49', 'product_3', 20.00, FALSE); +VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb49', 'product_3', 20.00, FALSE); INSERT INTO restaurant.products(id, name, price, available) - VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb50', 'product_4', 40.00, TRUE); +VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb50', 'product_4', 40.00, TRUE); INSERT INTO restaurant.restaurant_products(id, restaurant_id, product_id) - VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb51', 'd215b5f8-0249-4dc5-89a3-51fd148cfb45', 'd215b5f8-0249-4dc5-89a3-51fd148cfb47'); +VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb51', 'd215b5f8-0249-4dc5-89a3-51fd148cfb45', 'd215b5f8-0249-4dc5-89a3-51fd148cfb47'); INSERT INTO restaurant.restaurant_products(id, restaurant_id, product_id) - VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb52', 'd215b5f8-0249-4dc5-89a3-51fd148cfb45', 'd215b5f8-0249-4dc5-89a3-51fd148cfb48'); +VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb52', 'd215b5f8-0249-4dc5-89a3-51fd148cfb45', 'd215b5f8-0249-4dc5-89a3-51fd148cfb48'); INSERT INTO restaurant.restaurant_products(id, restaurant_id, product_id) - VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb53', 'd215b5f8-0249-4dc5-89a3-51fd148cfb46', 'd215b5f8-0249-4dc5-89a3-51fd148cfb49'); +VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb53', 'd215b5f8-0249-4dc5-89a3-51fd148cfb46', 'd215b5f8-0249-4dc5-89a3-51fd148cfb49'); INSERT INTO restaurant.restaurant_products(id, restaurant_id, product_id) - VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb54', 'd215b5f8-0249-4dc5-89a3-51fd148cfb46', 'd215b5f8-0249-4dc5-89a3-51fd148cfb50'); +VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb54', 'd215b5f8-0249-4dc5-89a3-51fd148cfb46', 'd215b5f8-0249-4dc5-89a3-51fd148cfb50'); diff --git a/restaurant-service/restaurant-container/src/main/resources/init-schema.sql b/restaurant-service/restaurant-container/src/main/resources/init-schema.sql index 8f47d26..ef0fadd 100644 --- a/restaurant-service/restaurant-container/src/main/resources/init-schema.sql +++ b/restaurant-service/restaurant-container/src/main/resources/init-schema.sql @@ -52,34 +52,61 @@ CREATE TABLE restaurant.restaurant_products ALTER TABLE restaurant.restaurant_products ADD CONSTRAINT "FK_RESTAURANT_ID" FOREIGN KEY (restaurant_id) - REFERENCES restaurant.restaurants (id) MATCH SIMPLE - ON UPDATE NO ACTION - ON DELETE RESTRICT - NOT VALID; + REFERENCES restaurant.restaurants (id) MATCH SIMPLE + ON UPDATE NO ACTION + ON DELETE RESTRICT + NOT VALID; ALTER TABLE restaurant.restaurant_products ADD CONSTRAINT "FK_PRODUCT_ID" FOREIGN KEY (product_id) - REFERENCES restaurant.products (id) MATCH SIMPLE - ON UPDATE NO ACTION - ON DELETE RESTRICT - NOT VALID; + REFERENCES restaurant.products (id) MATCH SIMPLE + ON UPDATE NO ACTION + ON DELETE RESTRICT + NOT VALID; + +DROP TYPE IF EXISTS outbox_status; +CREATE TYPE outbox_status AS ENUM ('STARTED', 'COMPLETED', 'FAILED'); + +DROP TABLE IF EXISTS restaurant.order_outbox CASCADE; + +CREATE TABLE restaurant.order_outbox +( + id uuid NOT NULL, + saga_id uuid NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + processed_at TIMESTAMP WITH TIME ZONE, + type character varying COLLATE pg_catalog."default" NOT NULL, + payload jsonb NOT NULL, + outbox_status outbox_status NOT NULL, + approval_status approval_status NOT NULL, + version integer NOT NULL, + CONSTRAINT order_outbox_pkey PRIMARY KEY (id) +); + +CREATE INDEX "restaurant_order_outbox_saga_status" + ON "restaurant".order_outbox + (type, approval_status); + +CREATE UNIQUE INDEX "restaurant_order_outbox_saga_id" + ON "restaurant".order_outbox + (type, saga_id, approval_status, outbox_status); DROP MATERIALIZED VIEW IF EXISTS restaurant.order_restaurant_m_view; CREATE MATERIALIZED VIEW restaurant.order_restaurant_m_view -TABLESPACE pg_default + TABLESPACE pg_default AS - SELECT r.id AS restaurant_id, - r.name AS restaurant_name, - r.active AS restaurant_active, - p.id AS product_id, - p.name AS product_name, - p.price AS product_price, - p.available AS product_active - FROM restaurant.restaurants r, - restaurant.products p, - restaurant.restaurant_products rp - WHERE r.id = rp.restaurant_id AND p.id = rp.product_id +SELECT r.id AS restaurant_id, + r.name AS restaurant_name, + r.active AS restaurant_active, + p.id AS product_id, + p.name AS product_name, + p.price AS product_price, + p.available AS product_available +FROM restaurant.restaurants r, + restaurant.products p, + restaurant.restaurant_products rp +WHERE r.id = rp.restaurant_id AND p.id = rp.product_id WITH DATA; refresh materialized VIEW restaurant.order_restaurant_m_view; @@ -87,17 +114,17 @@ refresh materialized VIEW restaurant.order_restaurant_m_view; DROP function IF EXISTS restaurant.refresh_order_restaurant_m_view; CREATE OR replace function restaurant.refresh_order_restaurant_m_view() -returns trigger + returns trigger AS ' -BEGIN - refresh materialized VIEW restaurant.order_restaurant_m_view; - return null; -END; + BEGIN + refresh materialized VIEW restaurant.order_restaurant_m_view; + return null; + END; ' LANGUAGE plpgsql; DROP trigger IF EXISTS refresh_order_restaurant_m_view ON restaurant.restaurant_products; CREATE trigger refresh_order_restaurant_m_view -after INSERT OR UPDATE OR DELETE OR truncate -ON restaurant.restaurant_products FOR each statement + after INSERT OR UPDATE OR DELETE OR truncate + ON restaurant.restaurant_products FOR each statement EXECUTE PROCEDURE restaurant.refresh_order_restaurant_m_view(); \ No newline at end of file diff --git a/restaurant-service/restaurant-dataaccess/pom.xml b/restaurant-service/restaurant-dataaccess/pom.xml index 0d8bc00..9d01c63 100644 --- a/restaurant-service/restaurant-dataaccess/pom.xml +++ b/restaurant-service/restaurant-dataaccess/pom.xml @@ -18,7 +18,10 @@ com.food.order restaurant-application-service - + + com.food.order + outbox + org.springframework.boot spring-boot-starter-data-jpa diff --git a/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/outbox/adapter/OrderOutboxRepositoryImpl.java b/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/outbox/adapter/OrderOutboxRepositoryImpl.java new file mode 100644 index 0000000..0655ac1 --- /dev/null +++ b/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/outbox/adapter/OrderOutboxRepositoryImpl.java @@ -0,0 +1,54 @@ +package com.food.order.system.data.access.restaurant.outbox.adapter; + + +import com.food.order.system.data.access.restaurant.outbox.exception.OrderOutboxNotFoundException; +import com.food.order.system.data.access.restaurant.outbox.mapper.OrderOutboxDataAccessMapper; +import com.food.order.system.data.access.restaurant.outbox.repository.OrderOutboxJpaRepository; +import com.food.order.system.outbox.OutboxStatus; +import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage; +import com.food.order.system.restaurant.domain.service.ports.output.repository.OrderOutboxRepository; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +@Component +@RequiredArgsConstructor +public class OrderOutboxRepositoryImpl implements OrderOutboxRepository { + + private final OrderOutboxJpaRepository orderOutboxJpaRepository; + private final OrderOutboxDataAccessMapper orderOutboxDataAccessMapper; + + @Override + public OrderOutboxMessage save(OrderOutboxMessage orderPaymentOutboxMessage) { + return orderOutboxDataAccessMapper + .orderOutboxEntityToOrderOutboxMessage(orderOutboxJpaRepository + .save(orderOutboxDataAccessMapper + .orderOutboxMessageToOutboxEntity(orderPaymentOutboxMessage))); + } + + @Override + public Optional> findByTypeAndOutboxStatus(String sagaType, OutboxStatus outboxStatus) { + return Optional.of(orderOutboxJpaRepository.findByTypeAndOutboxStatus(sagaType, outboxStatus) + .orElseThrow(() -> new OrderOutboxNotFoundException("Approval outbox object " + + "cannot be found for saga type " + sagaType)) + .stream() + .map(orderOutboxDataAccessMapper::orderOutboxEntityToOrderOutboxMessage) + .collect(Collectors.toList())); + } + + @Override + public Optional findByTypeAndSagaIdAndOutboxStatus(String type, UUID sagaId, + OutboxStatus outboxStatus) { + return orderOutboxJpaRepository.findByTypeAndSagaIdAndOutboxStatus(type, sagaId, outboxStatus) + .map(orderOutboxDataAccessMapper::orderOutboxEntityToOrderOutboxMessage); + } + + @Override + public void deleteByTypeAndOutboxStatus(String type, OutboxStatus outboxStatus) { + orderOutboxJpaRepository.deleteByTypeAndOutboxStatus(type, outboxStatus); + } +} diff --git a/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/outbox/entity/OrderOutboxEntity.java b/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/outbox/entity/OrderOutboxEntity.java new file mode 100644 index 0000000..2c31a7e --- /dev/null +++ b/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/outbox/entity/OrderOutboxEntity.java @@ -0,0 +1,48 @@ +package com.food.order.system.data.access.restaurant.outbox.entity; + +import com.food.order.system.outbox.OutboxStatus; +import com.food.order.system.valueobject.OrderApprovalStatus; +import lombok.*; + +import javax.persistence.*; +import java.time.ZonedDateTime; +import java.util.Objects; +import java.util.UUID; + +@Getter +@Setter +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Table(name = "order_outbox") +@Entity +public class OrderOutboxEntity { + + @Id + private UUID id; + private UUID sagaId; + private ZonedDateTime createdAt; + private ZonedDateTime processedAt; + private String type; + private String payload; + @Enumerated(EnumType.STRING) + private OutboxStatus outboxStatus; + @Enumerated(EnumType.STRING) + private OrderApprovalStatus approvalStatus; + private int version; + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OrderOutboxEntity that = (OrderOutboxEntity) o; + return id.equals(that.id); + } + + @Override + public int hashCode() { + return Objects.hash(id); + } +} + diff --git a/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/outbox/exception/OrderOutboxNotFoundException.java b/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/outbox/exception/OrderOutboxNotFoundException.java new file mode 100644 index 0000000..b47c6e4 --- /dev/null +++ b/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/outbox/exception/OrderOutboxNotFoundException.java @@ -0,0 +1,8 @@ +package com.food.order.system.data.access.restaurant.outbox.exception; + +public class OrderOutboxNotFoundException extends RuntimeException { + + public OrderOutboxNotFoundException(String message) { + super(message); + } +} diff --git a/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/outbox/mapper/OrderOutboxDataAccessMapper.java b/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/outbox/mapper/OrderOutboxDataAccessMapper.java new file mode 100644 index 0000000..929aabc --- /dev/null +++ b/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/outbox/mapper/OrderOutboxDataAccessMapper.java @@ -0,0 +1,36 @@ +package com.food.order.system.data.access.restaurant.outbox.mapper; + +import com.food.order.system.data.access.restaurant.outbox.entity.OrderOutboxEntity; +import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage; +import org.springframework.stereotype.Component; + +@Component +public class OrderOutboxDataAccessMapper { + + public OrderOutboxEntity orderOutboxMessageToOutboxEntity(OrderOutboxMessage orderOutboxMessage) { + return OrderOutboxEntity.builder() + .id(orderOutboxMessage.getId()) + .sagaId(orderOutboxMessage.getSagaId()) + .createdAt(orderOutboxMessage.getCreatedAt()) + .type(orderOutboxMessage.getType()) + .payload(orderOutboxMessage.getPayload()) + .outboxStatus(orderOutboxMessage.getOutboxStatus()) + .approvalStatus(orderOutboxMessage.getApprovalStatus()) + .version(orderOutboxMessage.getVersion()) + .build(); + } + + public OrderOutboxMessage orderOutboxEntityToOrderOutboxMessage(OrderOutboxEntity paymentOutboxEntity) { + return OrderOutboxMessage.builder() + .id(paymentOutboxEntity.getId()) + .sagaId(paymentOutboxEntity.getSagaId()) + .createdAt(paymentOutboxEntity.getCreatedAt()) + .type(paymentOutboxEntity.getType()) + .payload(paymentOutboxEntity.getPayload()) + .outboxStatus(paymentOutboxEntity.getOutboxStatus()) + .approvalStatus(paymentOutboxEntity.getApprovalStatus()) + .version(paymentOutboxEntity.getVersion()) + .build(); + } + +} diff --git a/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/outbox/repository/OrderOutboxJpaRepository.java b/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/outbox/repository/OrderOutboxJpaRepository.java new file mode 100644 index 0000000..daa3d17 --- /dev/null +++ b/restaurant-service/restaurant-dataaccess/src/main/java/com/food/order/system/data/access/restaurant/outbox/repository/OrderOutboxJpaRepository.java @@ -0,0 +1,21 @@ +package com.food.order.system.data.access.restaurant.outbox.repository; + +import com.food.order.system.data.access.restaurant.outbox.entity.OrderOutboxEntity; +import com.food.order.system.outbox.OutboxStatus; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +@Repository +public interface OrderOutboxJpaRepository extends JpaRepository { + + Optional> findByTypeAndOutboxStatus(String type, OutboxStatus outboxStatus); + + Optional findByTypeAndSagaIdAndOutboxStatus(String type, UUID sagaId, OutboxStatus outboxStatus); + + void deleteByTypeAndOutboxStatus(String type, OutboxStatus outboxStatus); + +} diff --git a/restaurant-service/restaurant-domain/restaurant-application-service/pom.xml b/restaurant-service/restaurant-domain/restaurant-application-service/pom.xml index 8a10a47..613a838 100644 --- a/restaurant-service/restaurant-domain/restaurant-application-service/pom.xml +++ b/restaurant-service/restaurant-domain/restaurant-application-service/pom.xml @@ -33,7 +33,18 @@ org.springframework spring-tx - + + com.food.order + outbox + + + com.food.order + saga + + + org.springframework.boot + spring-boot-starter-json + \ No newline at end of file diff --git a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/RestaurantApprovalRequestHelper.java b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/RestaurantApprovalRequestHelper.java index 3dc7de2..3a5f9d6 100644 --- a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/RestaurantApprovalRequestHelper.java +++ b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/RestaurantApprovalRequestHelper.java @@ -1,16 +1,19 @@ package com.food.order.system.restaurant.domain.service; +import com.food.order.system.outbox.OutboxStatus; import com.food.order.system.restaurant.domain.core.RestaurantDomainService; import com.food.order.system.restaurant.domain.core.entity.Restaurant; import com.food.order.system.restaurant.domain.core.event.OrderApprovalEvent; import com.food.order.system.restaurant.domain.core.exception.RestaurantNotFoundException; import com.food.order.system.restaurant.domain.service.dto.RestaurantApprovalRequest; import com.food.order.system.restaurant.domain.service.mapper.RestaurantDataMapper; -import com.food.order.system.restaurant.domain.service.ports.output.message.publisher.OrderApprovedMessagePublisher; -import com.food.order.system.restaurant.domain.service.ports.output.message.publisher.OrderRejectedMessagePublisher; +import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage; +import com.food.order.system.restaurant.domain.service.outbox.scheduler.OrderOutboxHelper; +import com.food.order.system.restaurant.domain.service.ports.output.message.publisher.RestaurantApprovalResponseMessagePublisher; import com.food.order.system.restaurant.domain.service.ports.output.repository.OrderApprovalRepository; import com.food.order.system.restaurant.domain.service.ports.output.repository.RestaurantRepository; import com.food.order.system.valueobject.OrderId; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -22,39 +25,40 @@ import java.util.UUID; @Slf4j @Component +@RequiredArgsConstructor public class RestaurantApprovalRequestHelper { private final RestaurantDomainService restaurantDomainService; private final RestaurantDataMapper restaurantDataMapper; private final RestaurantRepository restaurantRepository; private final OrderApprovalRepository orderApprovalRepository; - private final OrderApprovedMessagePublisher orderApprovedMessagePublisher; - private final OrderRejectedMessagePublisher orderRejectedMessagePublisher; + private final OrderOutboxHelper orderOutboxHelper; + private final RestaurantApprovalResponseMessagePublisher restaurantApprovalResponseMessagePublisher; - public RestaurantApprovalRequestHelper(RestaurantDomainService restaurantDomainService, - RestaurantDataMapper restaurantDataMapper, - RestaurantRepository restaurantRepository, - OrderApprovalRepository orderApprovalRepository, - OrderApprovedMessagePublisher orderApprovedMessagePublisher, - OrderRejectedMessagePublisher orderRejectedMessagePublisher) { - this.restaurantDomainService = restaurantDomainService; - this.restaurantDataMapper = restaurantDataMapper; - this.restaurantRepository = restaurantRepository; - this.orderApprovalRepository = orderApprovalRepository; - this.orderApprovedMessagePublisher = orderApprovedMessagePublisher; - this.orderRejectedMessagePublisher = orderRejectedMessagePublisher; - } @Transactional - public OrderApprovalEvent persistOrderApproval(RestaurantApprovalRequest request) { - log.info("Persisting order approval request: {}", request); - List failureMessages = new ArrayList<>(); - var restaurant = findRestaurant(request); - var event = restaurantDomainService.validateOrder - (restaurant, failureMessages, orderApprovedMessagePublisher, orderRejectedMessagePublisher); + public void persistOrderApproval(RestaurantApprovalRequest restaurantApprovalRequest) { + if (publishIfOutboxMessageProcessed(restaurantApprovalRequest)) { + log.info("An outbox message with saga id: {} already saved to database!", + restaurantApprovalRequest.getSagaId()); + return; + } + log.info("Processing restaurant approval for order id: {}", restaurantApprovalRequest.getOrderId()); + List failureMessages = new ArrayList<>(); + Restaurant restaurant = findRestaurant(restaurantApprovalRequest); + OrderApprovalEvent orderApprovalEvent = + restaurantDomainService.validateOrder( + restaurant, + failureMessages); orderApprovalRepository.save(restaurant.getOrderApproval()); - return event; + + orderOutboxHelper + .saveOrderOutboxMessage(restaurantDataMapper. + orderApprovalEventToOrderEventPayload(orderApprovalEvent), + orderApprovalEvent.getOrderApproval().getStatus(), + OutboxStatus.STARTED, + UUID.fromString(restaurantApprovalRequest.getSagaId())); } @@ -80,4 +84,16 @@ public class RestaurantApprovalRequestHelper { return restaurant; } + + private boolean publishIfOutboxMessageProcessed(RestaurantApprovalRequest restaurantApprovalRequest) { + Optional orderOutboxMessage = + orderOutboxHelper.getCompletedOrderOutboxMessageBySagaIdAndOutboxStatus(UUID + .fromString(restaurantApprovalRequest.getSagaId()), OutboxStatus.COMPLETED); + if (orderOutboxMessage.isPresent()) { + restaurantApprovalResponseMessagePublisher.publish(orderOutboxMessage.get(), + orderOutboxHelper::updateOutboxStatus); + return true; + } + return false; + } } diff --git a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/RestaurantApprovalRequestMessageListenerImpl.java b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/RestaurantApprovalRequestMessageListenerImpl.java index eb2c020..431bc7c 100644 --- a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/RestaurantApprovalRequestMessageListenerImpl.java +++ b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/RestaurantApprovalRequestMessageListenerImpl.java @@ -14,7 +14,6 @@ public class RestaurantApprovalRequestMessageListenerImpl implements RestaurantA @Override public void approveOrder(RestaurantApprovalRequest request) { - var event = restaurantApprovalRequestHelper.persistOrderApproval(request); - event.fire(); + restaurantApprovalRequestHelper.persistOrderApproval(request); } } diff --git a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/mapper/RestaurantDataMapper.java b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/mapper/RestaurantDataMapper.java index 9caa32f..fdf1ae7 100644 --- a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/mapper/RestaurantDataMapper.java +++ b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/mapper/RestaurantDataMapper.java @@ -3,11 +3,13 @@ package com.food.order.system.restaurant.domain.service.mapper; import com.food.order.system.restaurant.domain.core.entity.OrderDetail; import com.food.order.system.restaurant.domain.core.entity.Product; import com.food.order.system.restaurant.domain.core.entity.Restaurant; +import com.food.order.system.restaurant.domain.core.event.OrderApprovalEvent; +import com.food.order.system.restaurant.domain.service.dto.RestaurantApprovalRequest; +import com.food.order.system.restaurant.domain.service.outbox.model.OrderEventPayload; import com.food.order.system.valueobject.Money; import com.food.order.system.valueobject.OrderId; import com.food.order.system.valueobject.OrderStatus; import com.food.order.system.valueobject.RestaurantId; -import com.food.order.system.restaurant.domain.service.dto.RestaurantApprovalRequest; import org.springframework.stereotype.Component; import java.util.UUID; @@ -31,4 +33,14 @@ public class RestaurantDataMapper { .build()) .build(); } + public OrderEventPayload + orderApprovalEventToOrderEventPayload(OrderApprovalEvent orderApprovalEvent) { + return OrderEventPayload.builder() + .orderId(orderApprovalEvent.getOrderApproval().getOrderId().getValue().toString()) + .restaurantId(orderApprovalEvent.getRestaurantId().getValue().toString()) + .orderApprovalStatus(orderApprovalEvent.getOrderApproval().getStatus().name()) + .createdAt(orderApprovalEvent.getCreatedAt()) + .failureMessages(orderApprovalEvent.getFailureMessages()) + .build(); + } } diff --git a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/outbox/model/OrderEventPayload.java b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/outbox/model/OrderEventPayload.java new file mode 100644 index 0000000..3069877 --- /dev/null +++ b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/outbox/model/OrderEventPayload.java @@ -0,0 +1,32 @@ +package com.food.order.system.restaurant.domain.service.outbox.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; + +import java.time.ZonedDateTime; +import java.util.List; + +@Getter +@Builder +@AllArgsConstructor +public class OrderEventPayload { + + @JsonProperty + private String orderId; + + @JsonProperty + private String restaurantId; + + @JsonProperty + private ZonedDateTime createdAt; + + @JsonProperty + private String orderApprovalStatus; + + @JsonProperty + private List failureMessages; + + +} diff --git a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/outbox/model/OrderOutboxMessage.java b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/outbox/model/OrderOutboxMessage.java new file mode 100644 index 0000000..a4ed76f --- /dev/null +++ b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/outbox/model/OrderOutboxMessage.java @@ -0,0 +1,29 @@ +package com.food.order.system.restaurant.domain.service.outbox.model; + +import com.food.order.system.outbox.OutboxStatus; +import com.food.order.system.valueobject.OrderApprovalStatus; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; + +import java.time.ZonedDateTime; +import java.util.UUID; + +@Getter +@Builder +@AllArgsConstructor +public class OrderOutboxMessage { + private UUID id; + private UUID sagaId; + private ZonedDateTime createdAt; + private ZonedDateTime processedAt; + private String type; + private String payload; + private OutboxStatus outboxStatus; + private OrderApprovalStatus approvalStatus; + private int version; + + public void setOutboxStatus(OutboxStatus status) { + this.outboxStatus = status; + } +} diff --git a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/outbox/scheduler/OrderOutboxCleanerScheduler.java b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/outbox/scheduler/OrderOutboxCleanerScheduler.java new file mode 100644 index 0000000..333f93c --- /dev/null +++ b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/outbox/scheduler/OrderOutboxCleanerScheduler.java @@ -0,0 +1,34 @@ +package com.food.order.system.restaurant.domain.service.outbox.scheduler; + +import com.food.order.system.outbox.OutboxScheduler; +import com.food.order.system.outbox.OutboxStatus; +import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; +import java.util.Optional; + +@Slf4j +@Component +@RequiredArgsConstructor +public class OrderOutboxCleanerScheduler implements OutboxScheduler { + + private final OrderOutboxHelper orderOutboxHelper; + @Transactional + @Scheduled(cron = "@midnight") + @Override + public void processOutboxMessage() { + Optional> outboxMessagesResponse = + orderOutboxHelper.getOrderOutboxMessageByOutboxStatus(OutboxStatus.COMPLETED); + if (outboxMessagesResponse.isPresent() && outboxMessagesResponse.get().size() > 0) { + List outboxMessages = outboxMessagesResponse.get(); + log.info("Received {} OrderOutboxMessage for clean-up!", outboxMessages.size()); + orderOutboxHelper.deleteOrderOutboxMessageByOutboxStatus(OutboxStatus.COMPLETED); + log.info("Deleted {} OrderOutboxMessage!", outboxMessages.size()); + } + } +} \ No newline at end of file diff --git a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/outbox/scheduler/OrderOutboxHelper.java b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/outbox/scheduler/OrderOutboxHelper.java new file mode 100644 index 0000000..e7eee61 --- /dev/null +++ b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/outbox/scheduler/OrderOutboxHelper.java @@ -0,0 +1,93 @@ +package com.food.order.system.restaurant.domain.service.outbox.scheduler; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.food.order.system.outbox.OutboxStatus; +import com.food.order.system.restaurant.domain.core.exception.RestaurantDomainException; +import com.food.order.system.restaurant.domain.service.outbox.model.OrderEventPayload; +import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage; +import com.food.order.system.restaurant.domain.service.ports.output.repository.OrderOutboxRepository; +import com.food.order.system.valueobject.OrderApprovalStatus; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +import static com.food.order.system.DomainConstants.UTC; +import static com.food.order.system.outbox.order.SagaConst.ORDER_PROCESSING_SAGA; + + +@Slf4j +@Component +@RequiredArgsConstructor +public class OrderOutboxHelper { + + private final OrderOutboxRepository orderOutboxRepository; + private final ObjectMapper objectMapper; + + + @Transactional(readOnly = true) + public Optional getCompletedOrderOutboxMessageBySagaIdAndOutboxStatus(UUID sagaId, + OutboxStatus + outboxStatus) { + return orderOutboxRepository.findByTypeAndSagaIdAndOutboxStatus(ORDER_PROCESSING_SAGA, sagaId, outboxStatus); + } + + @Transactional(readOnly = true) + public Optional> getOrderOutboxMessageByOutboxStatus(OutboxStatus outboxStatus) { + return orderOutboxRepository.findByTypeAndOutboxStatus(ORDER_PROCESSING_SAGA, outboxStatus); + } + + @Transactional + public void deleteOrderOutboxMessageByOutboxStatus(OutboxStatus outboxStatus) { + orderOutboxRepository.deleteByTypeAndOutboxStatus(ORDER_PROCESSING_SAGA, outboxStatus); + } + + @Transactional + public void saveOrderOutboxMessage(OrderEventPayload orderEventPayload, + OrderApprovalStatus approvalStatus, + OutboxStatus outboxStatus, + UUID sagaId) { + save(OrderOutboxMessage.builder() + .id(UUID.randomUUID()) + .sagaId(sagaId) + .createdAt(orderEventPayload.getCreatedAt()) + .processedAt(ZonedDateTime.now(ZoneId.of(UTC))) + .type(ORDER_PROCESSING_SAGA) + .payload(createPayload(orderEventPayload)) + .approvalStatus(approvalStatus) + .outboxStatus(outboxStatus) + .build()); + } + + @Transactional + public void updateOutboxStatus(OrderOutboxMessage orderPaymentOutboxMessage, OutboxStatus outboxStatus) { + orderPaymentOutboxMessage.setOutboxStatus(outboxStatus); + save(orderPaymentOutboxMessage); + log.info("Order outbox table status is updated as: {}", outboxStatus.name()); + } + + private void save(OrderOutboxMessage orderPaymentOutboxMessage) { + OrderOutboxMessage response = orderOutboxRepository.save(orderPaymentOutboxMessage); + if (response == null) { + throw new RestaurantDomainException("Could not save OrderOutboxMessage!"); + } + log.info("OrderOutboxMessage saved with id: {}", orderPaymentOutboxMessage.getId()); + } + + private String createPayload(OrderEventPayload orderEventPayload) { + try { + return objectMapper.writeValueAsString(orderEventPayload); + } catch (JsonProcessingException e) { + log.error("Could not create OrderEventPayload json!", e); + throw new RestaurantDomainException("Could not create OrderEventPayload json!", e); + } + } + +} diff --git a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/outbox/scheduler/OrderOutboxScheduler.java b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/outbox/scheduler/OrderOutboxScheduler.java new file mode 100644 index 0000000..8df2b4d --- /dev/null +++ b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/outbox/scheduler/OrderOutboxScheduler.java @@ -0,0 +1,46 @@ +package com.food.order.system.restaurant.domain.service.outbox.scheduler; + +import com.food.order.system.outbox.OutboxScheduler; +import com.food.order.system.outbox.OutboxStatus; +import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage; +import com.food.order.system.restaurant.domain.service.ports.output.message.publisher.RestaurantApprovalResponseMessagePublisher; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +@Slf4j +@Component +@RequiredArgsConstructor +public class OrderOutboxScheduler implements OutboxScheduler { + + private final OrderOutboxHelper orderOutboxHelper; + private final RestaurantApprovalResponseMessagePublisher responseMessagePublisher; + + @Transactional + @Scheduled(fixedRateString = "${restaurant-service.outbox-scheduler-fixed-rate}", + initialDelayString = "${restaurant-service.outbox-scheduler-initial-delay}") + @Override + public void processOutboxMessage() { + Optional> outboxMessagesResponse = + orderOutboxHelper.getOrderOutboxMessageByOutboxStatus(OutboxStatus.STARTED); + if (outboxMessagesResponse.isPresent() && !outboxMessagesResponse.get().isEmpty()) { + List outboxMessages = outboxMessagesResponse.get(); + log.info("Received {} OrderOutboxMessage with ids {}, sending to message bus!", outboxMessages.size(), + outboxMessages.stream().map(outboxMessage -> + outboxMessage.getId().toString()).collect(Collectors.joining(","))); + outboxMessages.forEach(orderOutboxMessage -> + responseMessagePublisher.publish(orderOutboxMessage, + orderOutboxHelper::updateOutboxStatus)); + log.info("{} OrderOutboxMessage sent to message bus!", outboxMessages.size()); + } + } + + + +} \ No newline at end of file diff --git a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/ports/output/message/publisher/OrderApprovedMessagePublisher.java b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/ports/output/message/publisher/OrderApprovedMessagePublisher.java deleted file mode 100644 index 004b435..0000000 --- a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/ports/output/message/publisher/OrderApprovedMessagePublisher.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.food.order.system.restaurant.domain.service.ports.output.message.publisher; - -import com.food.order.system.restaurant.domain.core.event.OrderApprovedEvent; -import com.food.order.system.event.publisher.DomainEventPublisher; - -public interface OrderApprovedMessagePublisher extends DomainEventPublisher { -} - diff --git a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/ports/output/message/publisher/OrderRejectedMessagePublisher.java b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/ports/output/message/publisher/OrderRejectedMessagePublisher.java deleted file mode 100644 index d6bbbfc..0000000 --- a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/ports/output/message/publisher/OrderRejectedMessagePublisher.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.food.order.system.restaurant.domain.service.ports.output.message.publisher; - -import com.food.order.system.restaurant.domain.core.event.OrderRejectedEvent; -import com.food.order.system.event.publisher.DomainEventPublisher; - -public interface OrderRejectedMessagePublisher extends DomainEventPublisher { -} - diff --git a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/ports/output/message/publisher/RestaurantApprovalResponseMessagePublisher.java b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/ports/output/message/publisher/RestaurantApprovalResponseMessagePublisher.java new file mode 100644 index 0000000..8a2f364 --- /dev/null +++ b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/ports/output/message/publisher/RestaurantApprovalResponseMessagePublisher.java @@ -0,0 +1,12 @@ +package com.food.order.system.restaurant.domain.service.ports.output.message.publisher; + +import com.food.order.system.outbox.OutboxStatus; +import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage; + +import java.util.function.BiConsumer; + +public interface RestaurantApprovalResponseMessagePublisher { + + void publish(OrderOutboxMessage orderOutboxMessage, + BiConsumer outboxCallback); +} diff --git a/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/ports/output/repository/OrderOutboxRepository.java b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/ports/output/repository/OrderOutboxRepository.java new file mode 100644 index 0000000..9abf63b --- /dev/null +++ b/restaurant-service/restaurant-domain/restaurant-application-service/src/main/java/com/food/order/system/restaurant/domain/service/ports/output/repository/OrderOutboxRepository.java @@ -0,0 +1,22 @@ +package com.food.order.system.restaurant.domain.service.ports.output.repository; + + +import com.food.order.system.outbox.OutboxStatus; +import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +public interface OrderOutboxRepository { + + OrderOutboxMessage save(OrderOutboxMessage orderOutboxMessage); + + Optional> findByTypeAndOutboxStatus(String type, OutboxStatus outboxStatus); + + Optional findByTypeAndSagaIdAndOutboxStatus(String type, UUID sagaId, + OutboxStatus outboxStatus); + + void deleteByTypeAndOutboxStatus(String type, OutboxStatus outboxStatus); + +} diff --git a/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/RestaurantDomainService.java b/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/RestaurantDomainService.java index fbde8f2..d3bcaaa 100644 --- a/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/RestaurantDomainService.java +++ b/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/RestaurantDomainService.java @@ -2,16 +2,11 @@ package com.food.order.system.restaurant.domain.core; import com.food.order.system.restaurant.domain.core.entity.Restaurant; import com.food.order.system.restaurant.domain.core.event.OrderApprovalEvent; -import com.food.order.system.restaurant.domain.core.event.OrderApprovedEvent; -import com.food.order.system.restaurant.domain.core.event.OrderRejectedEvent; -import com.food.order.system.event.publisher.DomainEventPublisher; import java.util.List; public interface RestaurantDomainService { OrderApprovalEvent validateOrder(Restaurant restaurant, - List failureMessages, - DomainEventPublisher publisher, - DomainEventPublisher rejectedPublisher); + List failureMessages); } diff --git a/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/RestaurantDomainServiceImpl.java b/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/RestaurantDomainServiceImpl.java index 9943f81..17eb972 100644 --- a/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/RestaurantDomainServiceImpl.java +++ b/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/RestaurantDomainServiceImpl.java @@ -4,7 +4,6 @@ import com.food.order.system.restaurant.domain.core.entity.Restaurant; import com.food.order.system.restaurant.domain.core.event.OrderApprovalEvent; import com.food.order.system.restaurant.domain.core.event.OrderApprovedEvent; import com.food.order.system.restaurant.domain.core.event.OrderRejectedEvent; -import com.food.order.system.event.publisher.DomainEventPublisher; import com.food.order.system.valueobject.OrderApprovalStatus; import lombok.extern.slf4j.Slf4j; @@ -19,21 +18,19 @@ public class RestaurantDomainServiceImpl implements RestaurantDomainService { @Override public OrderApprovalEvent validateOrder(Restaurant restaurant, - List failureMessages, - DomainEventPublisher publisher, - DomainEventPublisher rejectedPublisher) { + List failureMessages) { restaurant.validateOrder(failureMessages); log.info("Order validation with id {}", restaurant.getOrderDetail().getId()); if (failureMessages.isEmpty()) { log.info("Order validation with id {} is successful", restaurant.getOrderDetail().getId()); restaurant.constructOrderApproval(OrderApprovalStatus.APPROVED); return new OrderApprovedEvent(restaurant.getOrderApproval(), restaurant.getId(), - failureMessages, ZonedDateTime.now(ZoneId.of(UTC)), publisher); + failureMessages, ZonedDateTime.now(ZoneId.of(UTC))); } else { log.info("Order validation with id {} is failed", restaurant.getOrderDetail().getId()); restaurant.constructOrderApproval(OrderApprovalStatus.REJECTED); return new OrderRejectedEvent(restaurant.getOrderApproval(), restaurant.getId(), - failureMessages, ZonedDateTime.now(), rejectedPublisher); + failureMessages, ZonedDateTime.now()); } } diff --git a/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/event/OrderApprovedEvent.java b/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/event/OrderApprovedEvent.java index 347db3a..acd3b32 100644 --- a/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/event/OrderApprovedEvent.java +++ b/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/event/OrderApprovedEvent.java @@ -1,7 +1,6 @@ package com.food.order.system.restaurant.domain.core.event; import com.food.order.system.restaurant.domain.core.entity.OrderApproval; -import com.food.order.system.event.publisher.DomainEventPublisher; import com.food.order.system.valueobject.RestaurantId; import java.time.ZonedDateTime; @@ -9,20 +8,11 @@ import java.util.List; public class OrderApprovedEvent extends OrderApprovalEvent { - - private final DomainEventPublisher publisher; - public OrderApprovedEvent(OrderApproval orderApproval, RestaurantId restaurantId, List failureMessages, - ZonedDateTime createdAt, - DomainEventPublisher publisher) { + ZonedDateTime createdAt) { super(orderApproval, restaurantId, failureMessages, createdAt); - this.publisher = publisher; } - @Override - public void fire() { - publisher.publish(this); - } } diff --git a/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/event/OrderRejectedEvent.java b/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/event/OrderRejectedEvent.java index faf3114..1715094 100644 --- a/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/event/OrderRejectedEvent.java +++ b/restaurant-service/restaurant-domain/restaurant-core-domain/src/main/java/com/food/order/system/restaurant/domain/core/event/OrderRejectedEvent.java @@ -9,19 +9,11 @@ import java.util.List; public class OrderRejectedEvent extends OrderApprovalEvent { - private final DomainEventPublisher publisher; public OrderRejectedEvent(OrderApproval orderApproval, RestaurantId restaurantId, List failureMessages, - ZonedDateTime createdAt, - DomainEventPublisher publisher) { + ZonedDateTime createdAt) { super(orderApproval, restaurantId, failureMessages, createdAt); - this.publisher = publisher; - } - - @Override - public void fire() { - publisher.publish(this); } } diff --git a/restaurant-service/restaurant-messaging/src/main/java/com/food/order/system/restaurant/messaging/mapper/RestaurantMessagingDataMapper.java b/restaurant-service/restaurant-messaging/src/main/java/com/food/order/system/restaurant/messaging/mapper/RestaurantMessagingDataMapper.java index ba99e1e..560b47d 100644 --- a/restaurant-service/restaurant-messaging/src/main/java/com/food/order/system/restaurant/messaging/mapper/RestaurantMessagingDataMapper.java +++ b/restaurant-service/restaurant-messaging/src/main/java/com/food/order/system/restaurant/messaging/mapper/RestaurantMessagingDataMapper.java @@ -5,11 +5,10 @@ import com.food.order.system.kafka.order.avro.model.OrderApprovalStatus; import com.food.order.system.kafka.order.avro.model.RestaurantApprovalRequestAvroModel; import com.food.order.system.kafka.order.avro.model.RestaurantApprovalResponseAvroModel; import com.food.order.system.restaurant.domain.core.entity.Product; -import com.food.order.system.restaurant.domain.core.event.OrderApprovedEvent; -import com.food.order.system.restaurant.domain.core.event.OrderRejectedEvent; +import com.food.order.system.restaurant.domain.service.dto.RestaurantApprovalRequest; +import com.food.order.system.restaurant.domain.service.outbox.model.OrderEventPayload; import com.food.order.system.valueobject.ProductId; import com.food.order.system.valueobject.RestaurantOrderStatus; -import com.food.order.system.restaurant.domain.service.dto.RestaurantApprovalRequest; import org.springframework.stereotype.Component; import java.util.UUID; @@ -17,33 +16,6 @@ import java.util.stream.Collectors; @Component public class RestaurantMessagingDataMapper { - public RestaurantApprovalResponseAvroModel - orderApprovedEventToRestaurantApprovalResponseAvroModel(OrderApprovedEvent orderApprovedEvent) { - return RestaurantApprovalResponseAvroModel.newBuilder() - .setId(UUID.randomUUID().toString()) - .setSagaId("") - .setOrderId(orderApprovedEvent.getOrderApproval().getOrderId().getValue().toString()) - .setRestaurantId(orderApprovedEvent.getRestaurantId().getValue().toString()) - .setCreatedAt(orderApprovedEvent.getCreatedAt().toInstant()) - .setOrderApprovalStatus(OrderApprovalStatus.valueOf(orderApprovedEvent. - getOrderApproval().getStatus().name())) - .setFailureMessages(orderApprovedEvent.getFailureMessages()) - .build(); - } - - public RestaurantApprovalResponseAvroModel - orderRejectedEventToRestaurantApprovalResponseAvroModel(OrderRejectedEvent orderRejectedEvent) { - return RestaurantApprovalResponseAvroModel.newBuilder() - .setId(UUID.randomUUID().toString()) - .setSagaId("") - .setOrderId(orderRejectedEvent.getOrderApproval().getOrderId().getValue().toString()) - .setRestaurantId(orderRejectedEvent.getRestaurantId().getValue().toString()) - .setCreatedAt(orderRejectedEvent.getCreatedAt().toInstant()) - .setOrderApprovalStatus(OrderApprovalStatus.valueOf(orderRejectedEvent. - getOrderApproval().getStatus().name())) - .setFailureMessages(orderRejectedEvent.getFailureMessages()) - .build(); - } public RestaurantApprovalRequest restaurantApprovalRequestAvroModelToRestaurantApproval(RestaurantApprovalRequestAvroModel @@ -66,4 +38,17 @@ public class RestaurantMessagingDataMapper { .createdAt(restaurantApprovalRequestAvroModel.getCreatedAt()) .build(); } + + public RestaurantApprovalResponseAvroModel + orderEventPayloadToRestaurantApprovalResponseAvroModel(String sagaId, OrderEventPayload orderEventPayload) { + return RestaurantApprovalResponseAvroModel.newBuilder() + .setId(UUID.randomUUID().toString()) + .setSagaId(sagaId) + .setOrderId(orderEventPayload.getOrderId()) + .setRestaurantId(orderEventPayload.getRestaurantId()) + .setCreatedAt(orderEventPayload.getCreatedAt().toInstant()) + .setOrderApprovalStatus(OrderApprovalStatus.valueOf(orderEventPayload.getOrderApprovalStatus())) + .setFailureMessages(orderEventPayload.getFailureMessages()) + .build(); + } } diff --git a/restaurant-service/restaurant-messaging/src/main/java/com/food/order/system/restaurant/messaging/publisher/kafka/OrderRejectedKafkaMessagePublisher.java b/restaurant-service/restaurant-messaging/src/main/java/com/food/order/system/restaurant/messaging/publisher/kafka/OrderRejectedKafkaMessagePublisher.java deleted file mode 100644 index 254a23b..0000000 --- a/restaurant-service/restaurant-messaging/src/main/java/com/food/order/system/restaurant/messaging/publisher/kafka/OrderRejectedKafkaMessagePublisher.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.food.order.system.restaurant.messaging.publisher.kafka; - - -import com.food.order.system.kafka.order.avro.model.RestaurantApprovalResponseAvroModel; -import com.food.order.system.kafka.producer.KafkaMessageHelper; -import com.food.order.system.kafka.producer.service.KafkaProducer; -import com.food.order.system.restaurant.domain.core.event.OrderRejectedEvent; -import com.food.order.system.restaurant.messaging.mapper.RestaurantMessagingDataMapper; -import com.food.order.system.restaurant.domain.service.config.RestaurantServiceConfigData; -import com.food.order.system.restaurant.domain.service.ports.output.message.publisher.OrderRejectedMessagePublisher; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -@Slf4j -@RequiredArgsConstructor -@Component -public class OrderRejectedKafkaMessagePublisher implements OrderRejectedMessagePublisher { - - private final RestaurantMessagingDataMapper restaurantMessagingDataMapper; - private final KafkaProducer kafkaProducer; - private final RestaurantServiceConfigData restaurantServiceConfigData; - private final KafkaMessageHelper kafkaMessageHelper; - - - @Override - public void publish(OrderRejectedEvent orderRejectedEvent) { - String orderId = orderRejectedEvent.getOrderApproval().getOrderId().getValue().toString(); - - log.info("Received OrderRejectedEvent for order id: {}", orderId); - - try { - RestaurantApprovalResponseAvroModel restaurantApprovalResponseAvroModel = - restaurantMessagingDataMapper - .orderRejectedEventToRestaurantApprovalResponseAvroModel(orderRejectedEvent); - - kafkaProducer.send(restaurantServiceConfigData.getRestaurantApprovalResponseTopicName(), - orderId, - restaurantApprovalResponseAvroModel, - kafkaMessageHelper.getKafkaCallBack(restaurantServiceConfigData - .getRestaurantApprovalResponseTopicName(), - restaurantApprovalResponseAvroModel, - orderId, - "RestaurantApprovalResponseAvroModel")); - - log.info("RestaurantApprovalResponseAvroModel sent to kafka at: {}", System.nanoTime()); - } catch (Exception e) { - log.error("Error while sending RestaurantApprovalResponseAvroModel message" + - " to kafka with order id: {}, error: {}", orderId, e.getMessage()); - } - } - -} diff --git a/restaurant-service/restaurant-messaging/src/main/java/com/food/order/system/restaurant/messaging/publisher/kafka/OrderApprovedKafkaMessagePublisher.java b/restaurant-service/restaurant-messaging/src/main/java/com/food/order/system/restaurant/messaging/publisher/kafka/RestaurantApprovalEventKafkaPublisher.java similarity index 51% rename from restaurant-service/restaurant-messaging/src/main/java/com/food/order/system/restaurant/messaging/publisher/kafka/OrderApprovedKafkaMessagePublisher.java rename to restaurant-service/restaurant-messaging/src/main/java/com/food/order/system/restaurant/messaging/publisher/kafka/RestaurantApprovalEventKafkaPublisher.java index 001b530..b91ed5d 100644 --- a/restaurant-service/restaurant-messaging/src/main/java/com/food/order/system/restaurant/messaging/publisher/kafka/OrderApprovedKafkaMessagePublisher.java +++ b/restaurant-service/restaurant-messaging/src/main/java/com/food/order/system/restaurant/messaging/publisher/kafka/RestaurantApprovalEventKafkaPublisher.java @@ -1,21 +1,24 @@ package com.food.order.system.restaurant.messaging.publisher.kafka; - import com.food.order.system.kafka.order.avro.model.RestaurantApprovalResponseAvroModel; import com.food.order.system.kafka.producer.KafkaMessageHelper; import com.food.order.system.kafka.producer.service.KafkaProducer; -import com.food.order.system.restaurant.domain.core.event.OrderApprovedEvent; -import com.food.order.system.restaurant.messaging.mapper.RestaurantMessagingDataMapper; +import com.food.order.system.outbox.OutboxStatus; import com.food.order.system.restaurant.domain.service.config.RestaurantServiceConfigData; -import com.food.order.system.restaurant.domain.service.ports.output.message.publisher.OrderApprovedMessagePublisher; +import com.food.order.system.restaurant.domain.service.outbox.model.OrderEventPayload; +import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage; +import com.food.order.system.restaurant.domain.service.ports.output.message.publisher.RestaurantApprovalResponseMessagePublisher; +import com.food.order.system.restaurant.messaging.mapper.RestaurantMessagingDataMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.function.BiConsumer; + @Slf4j -@Component @RequiredArgsConstructor -public class OrderApprovedKafkaMessagePublisher implements OrderApprovedMessagePublisher { +@Component +public class RestaurantApprovalEventKafkaPublisher implements RestaurantApprovalResponseMessagePublisher { private final RestaurantMessagingDataMapper restaurantMessagingDataMapper; private final KafkaProducer kafkaProducer; @@ -24,29 +27,39 @@ public class OrderApprovedKafkaMessagePublisher implements OrderApprovedMessageP @Override - public void publish(OrderApprovedEvent orderApprovedEvent) { - String orderId = orderApprovedEvent.getOrderApproval().getOrderId().getValue().toString(); + public void publish(OrderOutboxMessage orderOutboxMessage, + BiConsumer outboxCallback) { + OrderEventPayload orderEventPayload = + kafkaMessageHelper.getOrderEventPayload(orderOutboxMessage.getPayload(), + OrderEventPayload.class); - log.info("Received OrderApprovedEvent for order id: {}", orderId); + String sagaId = orderOutboxMessage.getSagaId().toString(); + log.info("Received OrderOutboxMessage for order id: {} and saga id: {}", + orderEventPayload.getOrderId(), + sagaId); try { RestaurantApprovalResponseAvroModel restaurantApprovalResponseAvroModel = restaurantMessagingDataMapper - .orderApprovedEventToRestaurantApprovalResponseAvroModel(orderApprovedEvent); + .orderEventPayloadToRestaurantApprovalResponseAvroModel(sagaId, orderEventPayload); kafkaProducer.send(restaurantServiceConfigData.getRestaurantApprovalResponseTopicName(), - orderId, + sagaId, restaurantApprovalResponseAvroModel, - kafkaMessageHelper.getKafkaCallBack(restaurantServiceConfigData + kafkaMessageHelper.getKafkaCallback(restaurantServiceConfigData .getRestaurantApprovalResponseTopicName(), restaurantApprovalResponseAvroModel, - orderId, + orderOutboxMessage, + outboxCallback, + orderEventPayload.getOrderId(), "RestaurantApprovalResponseAvroModel")); - log.info("RestaurantApprovalResponseAvroModel sent to kafka at: {}", System.nanoTime()); + log.info("RestaurantApprovalResponseAvroModel sent to kafka for order id: {} and saga id: {}", + restaurantApprovalResponseAvroModel.getOrderId(), sagaId); } catch (Exception e) { log.error("Error while sending RestaurantApprovalResponseAvroModel message" + - " to kafka with order id: {}, error: {}", orderId, e.getMessage()); + " to kafka with order id: {} and saga id: {}, error: {}", + orderEventPayload.getOrderId(), sagaId, e.getMessage()); } }