Outbox Message and Scheduler class implemented part - 6 and finish.

This commit is contained in:
Ali CANLI
2022-07-16 22:55:17 +03:00
parent 2ff4b7b3d5
commit 3626ed70de
29 changed files with 620 additions and 212 deletions

View File

@@ -8,6 +8,8 @@ logging:
restaurant-service:
restaurant-approval-request-topic-name: restaurant-approval-request-value
restaurant-approval-response-topic-name: restaurant-approval-response-value
outbox-scheduler-fixed-rate: 10000
outbox-scheduler-initial-delay: 10000
spring:
jpa:

View File

@@ -1,22 +1,22 @@
INSERT INTO restaurant.restaurants(id, name, active)
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb45', 'restaurant_1', TRUE);
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb45', 'restaurant_1', TRUE);
INSERT INTO restaurant.restaurants(id, name, active)
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb46', 'restaurant_2', FALSE);
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb46', 'restaurant_2', FALSE);
INSERT INTO restaurant.products(id, name, price, available)
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb47', 'product_1', 25.00, FALSE);
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb47', 'product_1', 25.00, FALSE);
INSERT INTO restaurant.products(id, name, price, available)
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb48', 'product_2', 50.00, TRUE);
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb48', 'product_2', 50.00, TRUE);
INSERT INTO restaurant.products(id, name, price, available)
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb49', 'product_3', 20.00, FALSE);
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb49', 'product_3', 20.00, FALSE);
INSERT INTO restaurant.products(id, name, price, available)
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb50', 'product_4', 40.00, TRUE);
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb50', 'product_4', 40.00, TRUE);
INSERT INTO restaurant.restaurant_products(id, restaurant_id, product_id)
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb51', 'd215b5f8-0249-4dc5-89a3-51fd148cfb45', 'd215b5f8-0249-4dc5-89a3-51fd148cfb47');
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb51', 'd215b5f8-0249-4dc5-89a3-51fd148cfb45', 'd215b5f8-0249-4dc5-89a3-51fd148cfb47');
INSERT INTO restaurant.restaurant_products(id, restaurant_id, product_id)
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb52', 'd215b5f8-0249-4dc5-89a3-51fd148cfb45', 'd215b5f8-0249-4dc5-89a3-51fd148cfb48');
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb52', 'd215b5f8-0249-4dc5-89a3-51fd148cfb45', 'd215b5f8-0249-4dc5-89a3-51fd148cfb48');
INSERT INTO restaurant.restaurant_products(id, restaurant_id, product_id)
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb53', 'd215b5f8-0249-4dc5-89a3-51fd148cfb46', 'd215b5f8-0249-4dc5-89a3-51fd148cfb49');
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb53', 'd215b5f8-0249-4dc5-89a3-51fd148cfb46', 'd215b5f8-0249-4dc5-89a3-51fd148cfb49');
INSERT INTO restaurant.restaurant_products(id, restaurant_id, product_id)
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb54', 'd215b5f8-0249-4dc5-89a3-51fd148cfb46', 'd215b5f8-0249-4dc5-89a3-51fd148cfb50');
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb54', 'd215b5f8-0249-4dc5-89a3-51fd148cfb46', 'd215b5f8-0249-4dc5-89a3-51fd148cfb50');

View File

@@ -52,34 +52,61 @@ CREATE TABLE restaurant.restaurant_products
ALTER TABLE restaurant.restaurant_products
ADD CONSTRAINT "FK_RESTAURANT_ID" FOREIGN KEY (restaurant_id)
REFERENCES restaurant.restaurants (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE RESTRICT
NOT VALID;
REFERENCES restaurant.restaurants (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE RESTRICT
NOT VALID;
ALTER TABLE restaurant.restaurant_products
ADD CONSTRAINT "FK_PRODUCT_ID" FOREIGN KEY (product_id)
REFERENCES restaurant.products (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE RESTRICT
NOT VALID;
REFERENCES restaurant.products (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE RESTRICT
NOT VALID;
DROP TYPE IF EXISTS outbox_status;
CREATE TYPE outbox_status AS ENUM ('STARTED', 'COMPLETED', 'FAILED');
DROP TABLE IF EXISTS restaurant.order_outbox CASCADE;
CREATE TABLE restaurant.order_outbox
(
id uuid NOT NULL,
saga_id uuid NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
processed_at TIMESTAMP WITH TIME ZONE,
type character varying COLLATE pg_catalog."default" NOT NULL,
payload jsonb NOT NULL,
outbox_status outbox_status NOT NULL,
approval_status approval_status NOT NULL,
version integer NOT NULL,
CONSTRAINT order_outbox_pkey PRIMARY KEY (id)
);
CREATE INDEX "restaurant_order_outbox_saga_status"
ON "restaurant".order_outbox
(type, approval_status);
CREATE UNIQUE INDEX "restaurant_order_outbox_saga_id"
ON "restaurant".order_outbox
(type, saga_id, approval_status, outbox_status);
DROP MATERIALIZED VIEW IF EXISTS restaurant.order_restaurant_m_view;
CREATE MATERIALIZED VIEW restaurant.order_restaurant_m_view
TABLESPACE pg_default
TABLESPACE pg_default
AS
SELECT r.id AS restaurant_id,
r.name AS restaurant_name,
r.active AS restaurant_active,
p.id AS product_id,
p.name AS product_name,
p.price AS product_price,
p.available AS product_active
FROM restaurant.restaurants r,
restaurant.products p,
restaurant.restaurant_products rp
WHERE r.id = rp.restaurant_id AND p.id = rp.product_id
SELECT r.id AS restaurant_id,
r.name AS restaurant_name,
r.active AS restaurant_active,
p.id AS product_id,
p.name AS product_name,
p.price AS product_price,
p.available AS product_available
FROM restaurant.restaurants r,
restaurant.products p,
restaurant.restaurant_products rp
WHERE r.id = rp.restaurant_id AND p.id = rp.product_id
WITH DATA;
refresh materialized VIEW restaurant.order_restaurant_m_view;
@@ -87,17 +114,17 @@ refresh materialized VIEW restaurant.order_restaurant_m_view;
DROP function IF EXISTS restaurant.refresh_order_restaurant_m_view;
CREATE OR replace function restaurant.refresh_order_restaurant_m_view()
returns trigger
returns trigger
AS '
BEGIN
refresh materialized VIEW restaurant.order_restaurant_m_view;
return null;
END;
BEGIN
refresh materialized VIEW restaurant.order_restaurant_m_view;
return null;
END;
' LANGUAGE plpgsql;
DROP trigger IF EXISTS refresh_order_restaurant_m_view ON restaurant.restaurant_products;
CREATE trigger refresh_order_restaurant_m_view
after INSERT OR UPDATE OR DELETE OR truncate
ON restaurant.restaurant_products FOR each statement
after INSERT OR UPDATE OR DELETE OR truncate
ON restaurant.restaurant_products FOR each statement
EXECUTE PROCEDURE restaurant.refresh_order_restaurant_m_view();

View File

@@ -18,7 +18,10 @@
<groupId>com.food.order</groupId>
<artifactId>restaurant-application-service</artifactId>
</dependency>
<dependency>
<groupId>com.food.order</groupId>
<artifactId>outbox</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>

View File

@@ -0,0 +1,54 @@
package com.food.order.system.data.access.restaurant.outbox.adapter;
import com.food.order.system.data.access.restaurant.outbox.exception.OrderOutboxNotFoundException;
import com.food.order.system.data.access.restaurant.outbox.mapper.OrderOutboxDataAccessMapper;
import com.food.order.system.data.access.restaurant.outbox.repository.OrderOutboxJpaRepository;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage;
import com.food.order.system.restaurant.domain.service.ports.output.repository.OrderOutboxRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
@Component
@RequiredArgsConstructor
public class OrderOutboxRepositoryImpl implements OrderOutboxRepository {
private final OrderOutboxJpaRepository orderOutboxJpaRepository;
private final OrderOutboxDataAccessMapper orderOutboxDataAccessMapper;
@Override
public OrderOutboxMessage save(OrderOutboxMessage orderPaymentOutboxMessage) {
return orderOutboxDataAccessMapper
.orderOutboxEntityToOrderOutboxMessage(orderOutboxJpaRepository
.save(orderOutboxDataAccessMapper
.orderOutboxMessageToOutboxEntity(orderPaymentOutboxMessage)));
}
@Override
public Optional<List<OrderOutboxMessage>> findByTypeAndOutboxStatus(String sagaType, OutboxStatus outboxStatus) {
return Optional.of(orderOutboxJpaRepository.findByTypeAndOutboxStatus(sagaType, outboxStatus)
.orElseThrow(() -> new OrderOutboxNotFoundException("Approval outbox object " +
"cannot be found for saga type " + sagaType))
.stream()
.map(orderOutboxDataAccessMapper::orderOutboxEntityToOrderOutboxMessage)
.collect(Collectors.toList()));
}
@Override
public Optional<OrderOutboxMessage> findByTypeAndSagaIdAndOutboxStatus(String type, UUID sagaId,
OutboxStatus outboxStatus) {
return orderOutboxJpaRepository.findByTypeAndSagaIdAndOutboxStatus(type, sagaId, outboxStatus)
.map(orderOutboxDataAccessMapper::orderOutboxEntityToOrderOutboxMessage);
}
@Override
public void deleteByTypeAndOutboxStatus(String type, OutboxStatus outboxStatus) {
orderOutboxJpaRepository.deleteByTypeAndOutboxStatus(type, outboxStatus);
}
}

View File

@@ -0,0 +1,48 @@
package com.food.order.system.data.access.restaurant.outbox.entity;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.valueobject.OrderApprovalStatus;
import lombok.*;
import javax.persistence.*;
import java.time.ZonedDateTime;
import java.util.Objects;
import java.util.UUID;
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "order_outbox")
@Entity
public class OrderOutboxEntity {
@Id
private UUID id;
private UUID sagaId;
private ZonedDateTime createdAt;
private ZonedDateTime processedAt;
private String type;
private String payload;
@Enumerated(EnumType.STRING)
private OutboxStatus outboxStatus;
@Enumerated(EnumType.STRING)
private OrderApprovalStatus approvalStatus;
private int version;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OrderOutboxEntity that = (OrderOutboxEntity) o;
return id.equals(that.id);
}
@Override
public int hashCode() {
return Objects.hash(id);
}
}

View File

@@ -0,0 +1,8 @@
package com.food.order.system.data.access.restaurant.outbox.exception;
public class OrderOutboxNotFoundException extends RuntimeException {
public OrderOutboxNotFoundException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,36 @@
package com.food.order.system.data.access.restaurant.outbox.mapper;
import com.food.order.system.data.access.restaurant.outbox.entity.OrderOutboxEntity;
import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage;
import org.springframework.stereotype.Component;
@Component
public class OrderOutboxDataAccessMapper {
public OrderOutboxEntity orderOutboxMessageToOutboxEntity(OrderOutboxMessage orderOutboxMessage) {
return OrderOutboxEntity.builder()
.id(orderOutboxMessage.getId())
.sagaId(orderOutboxMessage.getSagaId())
.createdAt(orderOutboxMessage.getCreatedAt())
.type(orderOutboxMessage.getType())
.payload(orderOutboxMessage.getPayload())
.outboxStatus(orderOutboxMessage.getOutboxStatus())
.approvalStatus(orderOutboxMessage.getApprovalStatus())
.version(orderOutboxMessage.getVersion())
.build();
}
public OrderOutboxMessage orderOutboxEntityToOrderOutboxMessage(OrderOutboxEntity paymentOutboxEntity) {
return OrderOutboxMessage.builder()
.id(paymentOutboxEntity.getId())
.sagaId(paymentOutboxEntity.getSagaId())
.createdAt(paymentOutboxEntity.getCreatedAt())
.type(paymentOutboxEntity.getType())
.payload(paymentOutboxEntity.getPayload())
.outboxStatus(paymentOutboxEntity.getOutboxStatus())
.approvalStatus(paymentOutboxEntity.getApprovalStatus())
.version(paymentOutboxEntity.getVersion())
.build();
}
}

View File

@@ -0,0 +1,21 @@
package com.food.order.system.data.access.restaurant.outbox.repository;
import com.food.order.system.data.access.restaurant.outbox.entity.OrderOutboxEntity;
import com.food.order.system.outbox.OutboxStatus;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@Repository
public interface OrderOutboxJpaRepository extends JpaRepository<OrderOutboxEntity, UUID> {
Optional<List<OrderOutboxEntity>> findByTypeAndOutboxStatus(String type, OutboxStatus outboxStatus);
Optional<OrderOutboxEntity> findByTypeAndSagaIdAndOutboxStatus(String type, UUID sagaId, OutboxStatus outboxStatus);
void deleteByTypeAndOutboxStatus(String type, OutboxStatus outboxStatus);
}

View File

@@ -33,7 +33,18 @@
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>com.food.order</groupId>
<artifactId>outbox</artifactId>
</dependency>
<dependency>
<groupId>com.food.order</groupId>
<artifactId>saga</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -1,16 +1,19 @@
package com.food.order.system.restaurant.domain.service;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.restaurant.domain.core.RestaurantDomainService;
import com.food.order.system.restaurant.domain.core.entity.Restaurant;
import com.food.order.system.restaurant.domain.core.event.OrderApprovalEvent;
import com.food.order.system.restaurant.domain.core.exception.RestaurantNotFoundException;
import com.food.order.system.restaurant.domain.service.dto.RestaurantApprovalRequest;
import com.food.order.system.restaurant.domain.service.mapper.RestaurantDataMapper;
import com.food.order.system.restaurant.domain.service.ports.output.message.publisher.OrderApprovedMessagePublisher;
import com.food.order.system.restaurant.domain.service.ports.output.message.publisher.OrderRejectedMessagePublisher;
import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage;
import com.food.order.system.restaurant.domain.service.outbox.scheduler.OrderOutboxHelper;
import com.food.order.system.restaurant.domain.service.ports.output.message.publisher.RestaurantApprovalResponseMessagePublisher;
import com.food.order.system.restaurant.domain.service.ports.output.repository.OrderApprovalRepository;
import com.food.order.system.restaurant.domain.service.ports.output.repository.RestaurantRepository;
import com.food.order.system.valueobject.OrderId;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@@ -22,39 +25,40 @@ import java.util.UUID;
@Slf4j
@Component
@RequiredArgsConstructor
public class RestaurantApprovalRequestHelper {
private final RestaurantDomainService restaurantDomainService;
private final RestaurantDataMapper restaurantDataMapper;
private final RestaurantRepository restaurantRepository;
private final OrderApprovalRepository orderApprovalRepository;
private final OrderApprovedMessagePublisher orderApprovedMessagePublisher;
private final OrderRejectedMessagePublisher orderRejectedMessagePublisher;
private final OrderOutboxHelper orderOutboxHelper;
private final RestaurantApprovalResponseMessagePublisher restaurantApprovalResponseMessagePublisher;
public RestaurantApprovalRequestHelper(RestaurantDomainService restaurantDomainService,
RestaurantDataMapper restaurantDataMapper,
RestaurantRepository restaurantRepository,
OrderApprovalRepository orderApprovalRepository,
OrderApprovedMessagePublisher orderApprovedMessagePublisher,
OrderRejectedMessagePublisher orderRejectedMessagePublisher) {
this.restaurantDomainService = restaurantDomainService;
this.restaurantDataMapper = restaurantDataMapper;
this.restaurantRepository = restaurantRepository;
this.orderApprovalRepository = orderApprovalRepository;
this.orderApprovedMessagePublisher = orderApprovedMessagePublisher;
this.orderRejectedMessagePublisher = orderRejectedMessagePublisher;
}
@Transactional
public OrderApprovalEvent persistOrderApproval(RestaurantApprovalRequest request) {
log.info("Persisting order approval request: {}", request);
List<String> failureMessages = new ArrayList<>();
var restaurant = findRestaurant(request);
var event = restaurantDomainService.validateOrder
(restaurant, failureMessages, orderApprovedMessagePublisher, orderRejectedMessagePublisher);
public void persistOrderApproval(RestaurantApprovalRequest restaurantApprovalRequest) {
if (publishIfOutboxMessageProcessed(restaurantApprovalRequest)) {
log.info("An outbox message with saga id: {} already saved to database!",
restaurantApprovalRequest.getSagaId());
return;
}
log.info("Processing restaurant approval for order id: {}", restaurantApprovalRequest.getOrderId());
List<String> failureMessages = new ArrayList<>();
Restaurant restaurant = findRestaurant(restaurantApprovalRequest);
OrderApprovalEvent orderApprovalEvent =
restaurantDomainService.validateOrder(
restaurant,
failureMessages);
orderApprovalRepository.save(restaurant.getOrderApproval());
return event;
orderOutboxHelper
.saveOrderOutboxMessage(restaurantDataMapper.
orderApprovalEventToOrderEventPayload(orderApprovalEvent),
orderApprovalEvent.getOrderApproval().getStatus(),
OutboxStatus.STARTED,
UUID.fromString(restaurantApprovalRequest.getSagaId()));
}
@@ -80,4 +84,16 @@ public class RestaurantApprovalRequestHelper {
return restaurant;
}
private boolean publishIfOutboxMessageProcessed(RestaurantApprovalRequest restaurantApprovalRequest) {
Optional<OrderOutboxMessage> orderOutboxMessage =
orderOutboxHelper.getCompletedOrderOutboxMessageBySagaIdAndOutboxStatus(UUID
.fromString(restaurantApprovalRequest.getSagaId()), OutboxStatus.COMPLETED);
if (orderOutboxMessage.isPresent()) {
restaurantApprovalResponseMessagePublisher.publish(orderOutboxMessage.get(),
orderOutboxHelper::updateOutboxStatus);
return true;
}
return false;
}
}

View File

@@ -14,7 +14,6 @@ public class RestaurantApprovalRequestMessageListenerImpl implements RestaurantA
@Override
public void approveOrder(RestaurantApprovalRequest request) {
var event = restaurantApprovalRequestHelper.persistOrderApproval(request);
event.fire();
restaurantApprovalRequestHelper.persistOrderApproval(request);
}
}

View File

@@ -3,11 +3,13 @@ package com.food.order.system.restaurant.domain.service.mapper;
import com.food.order.system.restaurant.domain.core.entity.OrderDetail;
import com.food.order.system.restaurant.domain.core.entity.Product;
import com.food.order.system.restaurant.domain.core.entity.Restaurant;
import com.food.order.system.restaurant.domain.core.event.OrderApprovalEvent;
import com.food.order.system.restaurant.domain.service.dto.RestaurantApprovalRequest;
import com.food.order.system.restaurant.domain.service.outbox.model.OrderEventPayload;
import com.food.order.system.valueobject.Money;
import com.food.order.system.valueobject.OrderId;
import com.food.order.system.valueobject.OrderStatus;
import com.food.order.system.valueobject.RestaurantId;
import com.food.order.system.restaurant.domain.service.dto.RestaurantApprovalRequest;
import org.springframework.stereotype.Component;
import java.util.UUID;
@@ -31,4 +33,14 @@ public class RestaurantDataMapper {
.build())
.build();
}
public OrderEventPayload
orderApprovalEventToOrderEventPayload(OrderApprovalEvent orderApprovalEvent) {
return OrderEventPayload.builder()
.orderId(orderApprovalEvent.getOrderApproval().getOrderId().getValue().toString())
.restaurantId(orderApprovalEvent.getRestaurantId().getValue().toString())
.orderApprovalStatus(orderApprovalEvent.getOrderApproval().getStatus().name())
.createdAt(orderApprovalEvent.getCreatedAt())
.failureMessages(orderApprovalEvent.getFailureMessages())
.build();
}
}

View File

@@ -0,0 +1,32 @@
package com.food.order.system.restaurant.domain.service.outbox.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import java.time.ZonedDateTime;
import java.util.List;
@Getter
@Builder
@AllArgsConstructor
public class OrderEventPayload {
@JsonProperty
private String orderId;
@JsonProperty
private String restaurantId;
@JsonProperty
private ZonedDateTime createdAt;
@JsonProperty
private String orderApprovalStatus;
@JsonProperty
private List<String> failureMessages;
}

View File

@@ -0,0 +1,29 @@
package com.food.order.system.restaurant.domain.service.outbox.model;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.valueobject.OrderApprovalStatus;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import java.time.ZonedDateTime;
import java.util.UUID;
@Getter
@Builder
@AllArgsConstructor
public class OrderOutboxMessage {
private UUID id;
private UUID sagaId;
private ZonedDateTime createdAt;
private ZonedDateTime processedAt;
private String type;
private String payload;
private OutboxStatus outboxStatus;
private OrderApprovalStatus approvalStatus;
private int version;
public void setOutboxStatus(OutboxStatus status) {
this.outboxStatus = status;
}
}

View File

@@ -0,0 +1,34 @@
package com.food.order.system.restaurant.domain.service.outbox.scheduler;
import com.food.order.system.outbox.OutboxScheduler;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Optional;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderOutboxCleanerScheduler implements OutboxScheduler {
private final OrderOutboxHelper orderOutboxHelper;
@Transactional
@Scheduled(cron = "@midnight")
@Override
public void processOutboxMessage() {
Optional<List<OrderOutboxMessage>> outboxMessagesResponse =
orderOutboxHelper.getOrderOutboxMessageByOutboxStatus(OutboxStatus.COMPLETED);
if (outboxMessagesResponse.isPresent() && outboxMessagesResponse.get().size() > 0) {
List<OrderOutboxMessage> outboxMessages = outboxMessagesResponse.get();
log.info("Received {} OrderOutboxMessage for clean-up!", outboxMessages.size());
orderOutboxHelper.deleteOrderOutboxMessageByOutboxStatus(OutboxStatus.COMPLETED);
log.info("Deleted {} OrderOutboxMessage!", outboxMessages.size());
}
}
}

View File

@@ -0,0 +1,93 @@
package com.food.order.system.restaurant.domain.service.outbox.scheduler;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.restaurant.domain.core.exception.RestaurantDomainException;
import com.food.order.system.restaurant.domain.service.outbox.model.OrderEventPayload;
import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage;
import com.food.order.system.restaurant.domain.service.ports.output.repository.OrderOutboxRepository;
import com.food.order.system.valueobject.OrderApprovalStatus;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import static com.food.order.system.DomainConstants.UTC;
import static com.food.order.system.outbox.order.SagaConst.ORDER_PROCESSING_SAGA;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderOutboxHelper {
private final OrderOutboxRepository orderOutboxRepository;
private final ObjectMapper objectMapper;
@Transactional(readOnly = true)
public Optional<OrderOutboxMessage> getCompletedOrderOutboxMessageBySagaIdAndOutboxStatus(UUID sagaId,
OutboxStatus
outboxStatus) {
return orderOutboxRepository.findByTypeAndSagaIdAndOutboxStatus(ORDER_PROCESSING_SAGA, sagaId, outboxStatus);
}
@Transactional(readOnly = true)
public Optional<List<OrderOutboxMessage>> getOrderOutboxMessageByOutboxStatus(OutboxStatus outboxStatus) {
return orderOutboxRepository.findByTypeAndOutboxStatus(ORDER_PROCESSING_SAGA, outboxStatus);
}
@Transactional
public void deleteOrderOutboxMessageByOutboxStatus(OutboxStatus outboxStatus) {
orderOutboxRepository.deleteByTypeAndOutboxStatus(ORDER_PROCESSING_SAGA, outboxStatus);
}
@Transactional
public void saveOrderOutboxMessage(OrderEventPayload orderEventPayload,
OrderApprovalStatus approvalStatus,
OutboxStatus outboxStatus,
UUID sagaId) {
save(OrderOutboxMessage.builder()
.id(UUID.randomUUID())
.sagaId(sagaId)
.createdAt(orderEventPayload.getCreatedAt())
.processedAt(ZonedDateTime.now(ZoneId.of(UTC)))
.type(ORDER_PROCESSING_SAGA)
.payload(createPayload(orderEventPayload))
.approvalStatus(approvalStatus)
.outboxStatus(outboxStatus)
.build());
}
@Transactional
public void updateOutboxStatus(OrderOutboxMessage orderPaymentOutboxMessage, OutboxStatus outboxStatus) {
orderPaymentOutboxMessage.setOutboxStatus(outboxStatus);
save(orderPaymentOutboxMessage);
log.info("Order outbox table status is updated as: {}", outboxStatus.name());
}
private void save(OrderOutboxMessage orderPaymentOutboxMessage) {
OrderOutboxMessage response = orderOutboxRepository.save(orderPaymentOutboxMessage);
if (response == null) {
throw new RestaurantDomainException("Could not save OrderOutboxMessage!");
}
log.info("OrderOutboxMessage saved with id: {}", orderPaymentOutboxMessage.getId());
}
private String createPayload(OrderEventPayload orderEventPayload) {
try {
return objectMapper.writeValueAsString(orderEventPayload);
} catch (JsonProcessingException e) {
log.error("Could not create OrderEventPayload json!", e);
throw new RestaurantDomainException("Could not create OrderEventPayload json!", e);
}
}
}

View File

@@ -0,0 +1,46 @@
package com.food.order.system.restaurant.domain.service.outbox.scheduler;
import com.food.order.system.outbox.OutboxScheduler;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage;
import com.food.order.system.restaurant.domain.service.ports.output.message.publisher.RestaurantApprovalResponseMessagePublisher;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderOutboxScheduler implements OutboxScheduler {
private final OrderOutboxHelper orderOutboxHelper;
private final RestaurantApprovalResponseMessagePublisher responseMessagePublisher;
@Transactional
@Scheduled(fixedRateString = "${restaurant-service.outbox-scheduler-fixed-rate}",
initialDelayString = "${restaurant-service.outbox-scheduler-initial-delay}")
@Override
public void processOutboxMessage() {
Optional<List<OrderOutboxMessage>> outboxMessagesResponse =
orderOutboxHelper.getOrderOutboxMessageByOutboxStatus(OutboxStatus.STARTED);
if (outboxMessagesResponse.isPresent() && !outboxMessagesResponse.get().isEmpty()) {
List<OrderOutboxMessage> outboxMessages = outboxMessagesResponse.get();
log.info("Received {} OrderOutboxMessage with ids {}, sending to message bus!", outboxMessages.size(),
outboxMessages.stream().map(outboxMessage ->
outboxMessage.getId().toString()).collect(Collectors.joining(",")));
outboxMessages.forEach(orderOutboxMessage ->
responseMessagePublisher.publish(orderOutboxMessage,
orderOutboxHelper::updateOutboxStatus));
log.info("{} OrderOutboxMessage sent to message bus!", outboxMessages.size());
}
}
}

View File

@@ -1,8 +0,0 @@
package com.food.order.system.restaurant.domain.service.ports.output.message.publisher;
import com.food.order.system.restaurant.domain.core.event.OrderApprovedEvent;
import com.food.order.system.event.publisher.DomainEventPublisher;
public interface OrderApprovedMessagePublisher extends DomainEventPublisher<OrderApprovedEvent> {
}

View File

@@ -1,8 +0,0 @@
package com.food.order.system.restaurant.domain.service.ports.output.message.publisher;
import com.food.order.system.restaurant.domain.core.event.OrderRejectedEvent;
import com.food.order.system.event.publisher.DomainEventPublisher;
public interface OrderRejectedMessagePublisher extends DomainEventPublisher<OrderRejectedEvent> {
}

View File

@@ -0,0 +1,12 @@
package com.food.order.system.restaurant.domain.service.ports.output.message.publisher;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage;
import java.util.function.BiConsumer;
public interface RestaurantApprovalResponseMessagePublisher {
void publish(OrderOutboxMessage orderOutboxMessage,
BiConsumer<OrderOutboxMessage, OutboxStatus> outboxCallback);
}

View File

@@ -0,0 +1,22 @@
package com.food.order.system.restaurant.domain.service.ports.output.repository;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
public interface OrderOutboxRepository {
OrderOutboxMessage save(OrderOutboxMessage orderOutboxMessage);
Optional<List<OrderOutboxMessage>> findByTypeAndOutboxStatus(String type, OutboxStatus outboxStatus);
Optional<OrderOutboxMessage> findByTypeAndSagaIdAndOutboxStatus(String type, UUID sagaId,
OutboxStatus outboxStatus);
void deleteByTypeAndOutboxStatus(String type, OutboxStatus outboxStatus);
}

View File

@@ -2,16 +2,11 @@ package com.food.order.system.restaurant.domain.core;
import com.food.order.system.restaurant.domain.core.entity.Restaurant;
import com.food.order.system.restaurant.domain.core.event.OrderApprovalEvent;
import com.food.order.system.restaurant.domain.core.event.OrderApprovedEvent;
import com.food.order.system.restaurant.domain.core.event.OrderRejectedEvent;
import com.food.order.system.event.publisher.DomainEventPublisher;
import java.util.List;
public interface RestaurantDomainService {
OrderApprovalEvent validateOrder(Restaurant restaurant,
List<String> failureMessages,
DomainEventPublisher<OrderApprovedEvent> publisher,
DomainEventPublisher<OrderRejectedEvent> rejectedPublisher);
List<String> failureMessages);
}

View File

@@ -4,7 +4,6 @@ import com.food.order.system.restaurant.domain.core.entity.Restaurant;
import com.food.order.system.restaurant.domain.core.event.OrderApprovalEvent;
import com.food.order.system.restaurant.domain.core.event.OrderApprovedEvent;
import com.food.order.system.restaurant.domain.core.event.OrderRejectedEvent;
import com.food.order.system.event.publisher.DomainEventPublisher;
import com.food.order.system.valueobject.OrderApprovalStatus;
import lombok.extern.slf4j.Slf4j;
@@ -19,21 +18,19 @@ public class RestaurantDomainServiceImpl implements RestaurantDomainService {
@Override
public OrderApprovalEvent validateOrder(Restaurant restaurant,
List<String> failureMessages,
DomainEventPublisher<OrderApprovedEvent> publisher,
DomainEventPublisher<OrderRejectedEvent> rejectedPublisher) {
List<String> failureMessages) {
restaurant.validateOrder(failureMessages);
log.info("Order validation with id {}", restaurant.getOrderDetail().getId());
if (failureMessages.isEmpty()) {
log.info("Order validation with id {} is successful", restaurant.getOrderDetail().getId());
restaurant.constructOrderApproval(OrderApprovalStatus.APPROVED);
return new OrderApprovedEvent(restaurant.getOrderApproval(), restaurant.getId(),
failureMessages, ZonedDateTime.now(ZoneId.of(UTC)), publisher);
failureMessages, ZonedDateTime.now(ZoneId.of(UTC)));
} else {
log.info("Order validation with id {} is failed", restaurant.getOrderDetail().getId());
restaurant.constructOrderApproval(OrderApprovalStatus.REJECTED);
return new OrderRejectedEvent(restaurant.getOrderApproval(), restaurant.getId(),
failureMessages, ZonedDateTime.now(), rejectedPublisher);
failureMessages, ZonedDateTime.now());
}
}

View File

@@ -1,7 +1,6 @@
package com.food.order.system.restaurant.domain.core.event;
import com.food.order.system.restaurant.domain.core.entity.OrderApproval;
import com.food.order.system.event.publisher.DomainEventPublisher;
import com.food.order.system.valueobject.RestaurantId;
import java.time.ZonedDateTime;
@@ -9,20 +8,11 @@ import java.util.List;
public class OrderApprovedEvent extends OrderApprovalEvent {
private final DomainEventPublisher<OrderApprovedEvent> publisher;
public OrderApprovedEvent(OrderApproval orderApproval,
RestaurantId restaurantId,
List<String> failureMessages,
ZonedDateTime createdAt,
DomainEventPublisher<OrderApprovedEvent> publisher) {
ZonedDateTime createdAt) {
super(orderApproval, restaurantId, failureMessages, createdAt);
this.publisher = publisher;
}
@Override
public void fire() {
publisher.publish(this);
}
}

View File

@@ -9,19 +9,11 @@ import java.util.List;
public class OrderRejectedEvent extends OrderApprovalEvent {
private final DomainEventPublisher<OrderRejectedEvent> publisher;
public OrderRejectedEvent(OrderApproval orderApproval,
RestaurantId restaurantId,
List<String> failureMessages,
ZonedDateTime createdAt,
DomainEventPublisher<OrderRejectedEvent> publisher) {
ZonedDateTime createdAt) {
super(orderApproval, restaurantId, failureMessages, createdAt);
this.publisher = publisher;
}
@Override
public void fire() {
publisher.publish(this);
}
}

View File

@@ -5,11 +5,10 @@ import com.food.order.system.kafka.order.avro.model.OrderApprovalStatus;
import com.food.order.system.kafka.order.avro.model.RestaurantApprovalRequestAvroModel;
import com.food.order.system.kafka.order.avro.model.RestaurantApprovalResponseAvroModel;
import com.food.order.system.restaurant.domain.core.entity.Product;
import com.food.order.system.restaurant.domain.core.event.OrderApprovedEvent;
import com.food.order.system.restaurant.domain.core.event.OrderRejectedEvent;
import com.food.order.system.restaurant.domain.service.dto.RestaurantApprovalRequest;
import com.food.order.system.restaurant.domain.service.outbox.model.OrderEventPayload;
import com.food.order.system.valueobject.ProductId;
import com.food.order.system.valueobject.RestaurantOrderStatus;
import com.food.order.system.restaurant.domain.service.dto.RestaurantApprovalRequest;
import org.springframework.stereotype.Component;
import java.util.UUID;
@@ -17,33 +16,6 @@ import java.util.stream.Collectors;
@Component
public class RestaurantMessagingDataMapper {
public RestaurantApprovalResponseAvroModel
orderApprovedEventToRestaurantApprovalResponseAvroModel(OrderApprovedEvent orderApprovedEvent) {
return RestaurantApprovalResponseAvroModel.newBuilder()
.setId(UUID.randomUUID().toString())
.setSagaId("")
.setOrderId(orderApprovedEvent.getOrderApproval().getOrderId().getValue().toString())
.setRestaurantId(orderApprovedEvent.getRestaurantId().getValue().toString())
.setCreatedAt(orderApprovedEvent.getCreatedAt().toInstant())
.setOrderApprovalStatus(OrderApprovalStatus.valueOf(orderApprovedEvent.
getOrderApproval().getStatus().name()))
.setFailureMessages(orderApprovedEvent.getFailureMessages())
.build();
}
public RestaurantApprovalResponseAvroModel
orderRejectedEventToRestaurantApprovalResponseAvroModel(OrderRejectedEvent orderRejectedEvent) {
return RestaurantApprovalResponseAvroModel.newBuilder()
.setId(UUID.randomUUID().toString())
.setSagaId("")
.setOrderId(orderRejectedEvent.getOrderApproval().getOrderId().getValue().toString())
.setRestaurantId(orderRejectedEvent.getRestaurantId().getValue().toString())
.setCreatedAt(orderRejectedEvent.getCreatedAt().toInstant())
.setOrderApprovalStatus(OrderApprovalStatus.valueOf(orderRejectedEvent.
getOrderApproval().getStatus().name()))
.setFailureMessages(orderRejectedEvent.getFailureMessages())
.build();
}
public RestaurantApprovalRequest
restaurantApprovalRequestAvroModelToRestaurantApproval(RestaurantApprovalRequestAvroModel
@@ -66,4 +38,17 @@ public class RestaurantMessagingDataMapper {
.createdAt(restaurantApprovalRequestAvroModel.getCreatedAt())
.build();
}
public RestaurantApprovalResponseAvroModel
orderEventPayloadToRestaurantApprovalResponseAvroModel(String sagaId, OrderEventPayload orderEventPayload) {
return RestaurantApprovalResponseAvroModel.newBuilder()
.setId(UUID.randomUUID().toString())
.setSagaId(sagaId)
.setOrderId(orderEventPayload.getOrderId())
.setRestaurantId(orderEventPayload.getRestaurantId())
.setCreatedAt(orderEventPayload.getCreatedAt().toInstant())
.setOrderApprovalStatus(OrderApprovalStatus.valueOf(orderEventPayload.getOrderApprovalStatus()))
.setFailureMessages(orderEventPayload.getFailureMessages())
.build();
}
}

View File

@@ -1,53 +0,0 @@
package com.food.order.system.restaurant.messaging.publisher.kafka;
import com.food.order.system.kafka.order.avro.model.RestaurantApprovalResponseAvroModel;
import com.food.order.system.kafka.producer.KafkaMessageHelper;
import com.food.order.system.kafka.producer.service.KafkaProducer;
import com.food.order.system.restaurant.domain.core.event.OrderRejectedEvent;
import com.food.order.system.restaurant.messaging.mapper.RestaurantMessagingDataMapper;
import com.food.order.system.restaurant.domain.service.config.RestaurantServiceConfigData;
import com.food.order.system.restaurant.domain.service.ports.output.message.publisher.OrderRejectedMessagePublisher;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@RequiredArgsConstructor
@Component
public class OrderRejectedKafkaMessagePublisher implements OrderRejectedMessagePublisher {
private final RestaurantMessagingDataMapper restaurantMessagingDataMapper;
private final KafkaProducer<String, RestaurantApprovalResponseAvroModel> kafkaProducer;
private final RestaurantServiceConfigData restaurantServiceConfigData;
private final KafkaMessageHelper kafkaMessageHelper;
@Override
public void publish(OrderRejectedEvent orderRejectedEvent) {
String orderId = orderRejectedEvent.getOrderApproval().getOrderId().getValue().toString();
log.info("Received OrderRejectedEvent for order id: {}", orderId);
try {
RestaurantApprovalResponseAvroModel restaurantApprovalResponseAvroModel =
restaurantMessagingDataMapper
.orderRejectedEventToRestaurantApprovalResponseAvroModel(orderRejectedEvent);
kafkaProducer.send(restaurantServiceConfigData.getRestaurantApprovalResponseTopicName(),
orderId,
restaurantApprovalResponseAvroModel,
kafkaMessageHelper.getKafkaCallBack(restaurantServiceConfigData
.getRestaurantApprovalResponseTopicName(),
restaurantApprovalResponseAvroModel,
orderId,
"RestaurantApprovalResponseAvroModel"));
log.info("RestaurantApprovalResponseAvroModel sent to kafka at: {}", System.nanoTime());
} catch (Exception e) {
log.error("Error while sending RestaurantApprovalResponseAvroModel message" +
" to kafka with order id: {}, error: {}", orderId, e.getMessage());
}
}
}

View File

@@ -1,21 +1,24 @@
package com.food.order.system.restaurant.messaging.publisher.kafka;
import com.food.order.system.kafka.order.avro.model.RestaurantApprovalResponseAvroModel;
import com.food.order.system.kafka.producer.KafkaMessageHelper;
import com.food.order.system.kafka.producer.service.KafkaProducer;
import com.food.order.system.restaurant.domain.core.event.OrderApprovedEvent;
import com.food.order.system.restaurant.messaging.mapper.RestaurantMessagingDataMapper;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.restaurant.domain.service.config.RestaurantServiceConfigData;
import com.food.order.system.restaurant.domain.service.ports.output.message.publisher.OrderApprovedMessagePublisher;
import com.food.order.system.restaurant.domain.service.outbox.model.OrderEventPayload;
import com.food.order.system.restaurant.domain.service.outbox.model.OrderOutboxMessage;
import com.food.order.system.restaurant.domain.service.ports.output.message.publisher.RestaurantApprovalResponseMessagePublisher;
import com.food.order.system.restaurant.messaging.mapper.RestaurantMessagingDataMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.function.BiConsumer;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderApprovedKafkaMessagePublisher implements OrderApprovedMessagePublisher {
@Component
public class RestaurantApprovalEventKafkaPublisher implements RestaurantApprovalResponseMessagePublisher {
private final RestaurantMessagingDataMapper restaurantMessagingDataMapper;
private final KafkaProducer<String, RestaurantApprovalResponseAvroModel> kafkaProducer;
@@ -24,29 +27,39 @@ public class OrderApprovedKafkaMessagePublisher implements OrderApprovedMessageP
@Override
public void publish(OrderApprovedEvent orderApprovedEvent) {
String orderId = orderApprovedEvent.getOrderApproval().getOrderId().getValue().toString();
public void publish(OrderOutboxMessage orderOutboxMessage,
BiConsumer<OrderOutboxMessage, OutboxStatus> outboxCallback) {
OrderEventPayload orderEventPayload =
kafkaMessageHelper.getOrderEventPayload(orderOutboxMessage.getPayload(),
OrderEventPayload.class);
log.info("Received OrderApprovedEvent for order id: {}", orderId);
String sagaId = orderOutboxMessage.getSagaId().toString();
log.info("Received OrderOutboxMessage for order id: {} and saga id: {}",
orderEventPayload.getOrderId(),
sagaId);
try {
RestaurantApprovalResponseAvroModel restaurantApprovalResponseAvroModel =
restaurantMessagingDataMapper
.orderApprovedEventToRestaurantApprovalResponseAvroModel(orderApprovedEvent);
.orderEventPayloadToRestaurantApprovalResponseAvroModel(sagaId, orderEventPayload);
kafkaProducer.send(restaurantServiceConfigData.getRestaurantApprovalResponseTopicName(),
orderId,
sagaId,
restaurantApprovalResponseAvroModel,
kafkaMessageHelper.getKafkaCallBack(restaurantServiceConfigData
kafkaMessageHelper.getKafkaCallback(restaurantServiceConfigData
.getRestaurantApprovalResponseTopicName(),
restaurantApprovalResponseAvroModel,
orderId,
orderOutboxMessage,
outboxCallback,
orderEventPayload.getOrderId(),
"RestaurantApprovalResponseAvroModel"));
log.info("RestaurantApprovalResponseAvroModel sent to kafka at: {}", System.nanoTime());
log.info("RestaurantApprovalResponseAvroModel sent to kafka for order id: {} and saga id: {}",
restaurantApprovalResponseAvroModel.getOrderId(), sagaId);
} catch (Exception e) {
log.error("Error while sending RestaurantApprovalResponseAvroModel message" +
" to kafka with order id: {}, error: {}", orderId, e.getMessage());
" to kafka with order id: {} and saga id: {}, error: {}",
orderEventPayload.getOrderId(), sagaId, e.getMessage());
}
}