CQRS Pattern - > implemented completed.

This commit is contained in:
Ali CANLI
2022-07-17 15:03:15 +03:00
parent b8c4cc6e83
commit ba432017f3
16 changed files with 195 additions and 29 deletions

View File

@@ -22,7 +22,6 @@ spring:
init:
mode: always
schema-locations: classpath:init-schema.sql
data-locations: classpath:init-data.sql
platform: postgres
kafka-config:

View File

@@ -1,5 +0,0 @@
INSERT INTO customer.customers(id, username, first_name, last_name)
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb41', 'user_1', 'First', 'User');
INSERT INTO customer.customers(id, username, first_name, last_name)
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb42', 'user_2', 'Second', 'User');

View File

@@ -9,6 +9,7 @@ order-service:
payment-response-topic-name: payment-response-value
restaurant-approval-request-topic-name: restaurant-approval-request-value
restaurant-approval-response-topic-name: restaurant-approval-response-value
customer-topic-name: customer
outbox-scheduler-fixed-rate: 10000
outbox-scheduler-initial-delay: 10000
@@ -54,6 +55,7 @@ kafka-consumer-config:
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
payment-consumer-group-id: payment-topic-consumer
restaurant-approval-consumer-group-id: restaurant-approval-topic-consumer
customer-group-id: customer-topic-consumer
auto-offset-reset: earliest
specific-avro-reader-key: specific.avro.reader
specific-avro-reader: true

View File

@@ -36,9 +36,9 @@ CREATE TABLE "order".order_items
ALTER TABLE "order".order_items
ADD CONSTRAINT "FK_ORDER_ID" FOREIGN KEY (order_id)
REFERENCES "order".orders (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE CASCADE
REFERENCES "order".orders (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE CASCADE
NOT VALID;
DROP TABLE IF EXISTS "order".order_address CASCADE;
@@ -55,17 +55,16 @@ CREATE TABLE "order".order_address
ALTER TABLE "order".order_address
ADD CONSTRAINT "FK_ORDER_ID" FOREIGN KEY (order_id)
REFERENCES "order".orders (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE CASCADE
REFERENCES "order".orders (id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE CASCADE
NOT VALID;
DROP TYPE IF EXISTS saga_status;
CREATE TYPE saga_status AS ENUM ('STARTED','FAILED','SUCCEEDED','PROCESSING', 'COMPENSATING','COMPENSATED');
CREATE TYPE saga_status AS ENUM ('STARTED', 'FAILED', 'SUCCEEDED', 'PROCESSING', 'COMPENSATING', 'COMPENSATED');
DROP TYPE IF EXISTS outbox_status;
CREATE TYPE outbox_status AS ENUM ('STARTED','COMPLETED','FAILED');
CREATE TYPE outbox_status AS ENUM ('STARTED', 'COMPLETED', 'FAILED');
DROP TABLE IF EXISTS "order".payment_outbox CASCADE;
@@ -88,10 +87,9 @@ CREATE INDEX "payment_outbox_saga_status"
ON "order".payment_outbox
(type, outbox_status, saga_status);
CREATE UNIQUE INDEX "payment_outbox_saga_id"
ON "order".payment_outbox
(type, saga_id, saga_status);
--CREATE UNIQUE INDEX "payment_outbox_saga_id"
-- ON "order".payment_outbox
-- (type, saga_id, saga_status);
DROP TABLE IF EXISTS "order".restaurant_approval_outbox CASCADE;
@@ -114,6 +112,17 @@ CREATE INDEX "restaurant_approval_outbox_saga_status"
ON "order".restaurant_approval_outbox
(type, outbox_status, saga_status);
CREATE UNIQUE INDEX "restaurant_approval_outbox_saga_id"
ON "order".restaurant_approval_outbox
(type, saga_id, saga_status);
--CREATE UNIQUE INDEX "restaurant_approval_outbox_saga_id"
-- ON "order".restaurant_approval_outbox
-- (type, saga_id, saga_status);
DROP TABLE IF EXISTS "order".customers CASCADE;
CREATE TABLE "order".customers
(
id uuid NOT NULL,
username character varying COLLATE pg_catalog."default" NOT NULL,
first_name character varying COLLATE pg_catalog."default" NOT NULL,
last_name character varying COLLATE pg_catalog."default" NOT NULL,
CONSTRAINT customers_pkey PRIMARY KEY (id)
);

View File

@@ -19,6 +19,12 @@ public class CustomerRepositoryImpl implements CustomerRepository {
private final CustomerDataAccessMapper customerDataAccessMapper;
@Override
public Customer save(Customer customer) {
return customerDataAccessMapper.customerEntityToCustomer(
customerJpaRepository.save(customerDataAccessMapper.customerToCustomerEntity(customer)));
}
@Override
public Optional<Customer> findCustomer(UUID customerId) {
return customerJpaRepository.findById(customerId)

View File

@@ -10,7 +10,7 @@ import java.util.UUID;
@Entity
@DynamicUpdate
@Table(name = "order_customer_m_view",schema = "customer")
@Table(name = "customers")
@Getter
@Setter
@Builder
@@ -20,4 +20,8 @@ public class CustomerEntity {
@Id
private UUID id;
private String username;
private String firstName;
private String lastName;
}

View File

@@ -11,4 +11,13 @@ public class CustomerDataAccessMapper {
public Customer customerEntityToCustomer(CustomerEntity customerEntity) {
return new Customer(new CustomerId(customerEntity.getId()));
}
public CustomerEntity customerToCustomerEntity(Customer customer) {
return CustomerEntity.builder()
.id(customer.getId().getValue())
.firstName(customer.getFirstName())
.lastName(customer.getLastName())
.username(customer.getUsername())
.build();
}
}

View File

@@ -0,0 +1,37 @@
package com.food.order.system;
import com.food.order.system.dto.message.CustomerModel;
import com.food.order.system.mapper.OrderDataMapper;
import com.food.order.system.ports.input.message.listener.customer.CustomerMessageListener;
import com.food.order.system.ports.output.repository.CustomerRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Objects;
@Service
@Slf4j
@RequiredArgsConstructor
public class CustomerMessageListenerImpl implements CustomerMessageListener {
private final CustomerRepository customerRepository;
private final OrderDataMapper orderDataMapper;
@Override
public void customerCreated(CustomerModel customerModel) {
var customer = customerRepository.save
(orderDataMapper.customerModelToCustomer(customerModel));
if (Objects.isNull(customer)) {
log.error("Customer not created");
} else {
log.info("Customer created");
}
}
}

View File

@@ -0,0 +1,15 @@
package com.food.order.system.dto.message;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
@Getter
@Builder
@AllArgsConstructor
public class CustomerModel {
private String id;
private String username;
private String firstName;
private String lastName;
}

View File

@@ -1,9 +1,6 @@
package com.food.order.system.mapper;
import com.food.order.system.domain.entity.Order;
import com.food.order.system.domain.entity.OrderItem;
import com.food.order.system.domain.entity.Product;
import com.food.order.system.domain.entity.Restaurant;
import com.food.order.system.domain.entity.*;
import com.food.order.system.domain.event.OrderCancelledEvent;
import com.food.order.system.domain.event.OrderCreatedEvent;
import com.food.order.system.domain.event.OrderPaidEvent;
@@ -11,6 +8,7 @@ import com.food.order.system.domain.valueobject.StreetAddress;
import com.food.order.system.dto.create.CreateOrderCommand;
import com.food.order.system.dto.create.CreateOrderResponse;
import com.food.order.system.dto.create.OrderAddress;
import com.food.order.system.dto.message.CustomerModel;
import com.food.order.system.dto.track.TrackOrderResponse;
import com.food.order.system.outbox.model.approval.OrderApprovalEventPayload;
import com.food.order.system.outbox.model.approval.OrderApprovalProduct;
@@ -24,6 +22,14 @@ import java.util.UUID;
@Component
public class OrderDataMapper {
public Customer customerModelToCustomer(CustomerModel customerModel) {
return new Customer(new CustomerId(UUID.fromString(customerModel.getId())),
customerModel.getUsername(),
customerModel.getFirstName(),
customerModel.getLastName());
}
public OrderPaymentEventPayload orderCancelledEventToOrderPaymentEventPayload(
OrderCancelledEvent orderCancelledEvent) {
return OrderPaymentEventPayload.builder()

View File

@@ -0,0 +1,7 @@
package com.food.order.system.ports.input.message.listener.customer;
import com.food.order.system.dto.message.CustomerModel;
public interface CustomerMessageListener {
void customerCreated(CustomerModel customerModel);
}

View File

@@ -6,5 +6,7 @@ import java.util.Optional;
import java.util.UUID;
public interface CustomerRepository {
Customer save(Customer customer);
Optional<Customer> findCustomer(UUID customerId);
}

View File

@@ -5,10 +5,33 @@ import com.food.order.system.valueobject.CustomerId;
public class Customer extends AggregateRoot<CustomerId> {
public Customer(){
private String username;
private String firstName;
private String lastName;
public Customer(CustomerId id, String username, String firstName, String lastName) {
super.setId(id);
this.username = username;
this.firstName = firstName;
this.lastName = lastName;
}
public Customer (CustomerId id) {
super.setId(id);
}
public String getUsername() {
return username;
}
public String getFirstName() {
return firstName;
}
public String getLastName() {
return lastName;
}
}

View File

@@ -0,0 +1,42 @@
package com.food.order.system.messaging.listener.kafka;
import com.food.order.system.kafka.consumer.KafkaConsumer;
import com.food.order.system.kafka.order.avro.model.CustomerAvroModel;
import com.food.order.system.messaging.mapper.OrderMessagingDataMapper;
import com.food.order.system.ports.input.message.listener.customer.CustomerMessageListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
@RequiredArgsConstructor
public class CustomerKafkaListener implements KafkaConsumer<CustomerAvroModel> {
private final CustomerMessageListener customerMessageListener;
private final OrderMessagingDataMapper orderMessagingDataMapper;
@Override
@KafkaListener(id = "${kafka-consumer-config.customer-group-id}",
topics = "${order-service.customer-topic-name}")
public void receive(List<CustomerAvroModel> messages,
List<String> keys,
List<Integer> partitions,
List<Long> offSets) {
log.info("{} number of customer create messages received with keys {}, partitions {} and offsets {}",
messages.size(),
keys.toString(),
partitions.toString(),
offSets.toString());
messages.forEach(customerAvroModel ->
customerMessageListener.customerCreated(orderMessagingDataMapper
.customerAvroModelToCustomerModel(customerAvroModel)));
}
}

View File

@@ -1,5 +1,6 @@
package com.food.order.system.messaging.mapper;
import com.food.order.system.dto.message.CustomerModel;
import com.food.order.system.dto.message.PaymentResponse;
import com.food.order.system.dto.message.RestaurantApprovalResponse;
import com.food.order.system.kafka.order.avro.model.*;
@@ -75,4 +76,13 @@ public class OrderMessagingDataMapper {
}
public CustomerModel customerAvroModelToCustomerModel(CustomerAvroModel customerAvroModel) {
return CustomerModel.builder()
.id(customerAvroModel.getId())
.username(customerAvroModel.getUsername())
.firstName(customerAvroModel.getFirstName())
.lastName(customerAvroModel.getLastName())
.build();
}
}

View File

@@ -28,7 +28,7 @@
<properties>
<maven-compiler-plugin.version>3.9.0</maven-compiler-plugin.version>
<mockito.version>4.5.1</mockito.version>
<mockito.version>4.3.1</mockito.version>
<spring-kafka.version>2.8.2</spring-kafka.version>
<kavka-avro.serializer.version>7.0.1</kavka-avro.serializer.version>
<avro.version>1.11.0</avro.version>