updating scs-100 tutorial
This commit is contained in:
33
scs-100/.gitignore
vendored
Normal file
33
scs-100/.gitignore
vendored
Normal file
@@ -0,0 +1,33 @@
|
||||
HELP.md
|
||||
target/
|
||||
!.mvn/wrapper/maven-wrapper.jar
|
||||
!**/src/main/**/target/
|
||||
!**/src/test/**/target/
|
||||
|
||||
### STS ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
.sts4-cache
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### NetBeans ###
|
||||
/nbproject/private/
|
||||
/nbbuild/
|
||||
/dist/
|
||||
/nbdist/
|
||||
/.nb-gradle/
|
||||
build/
|
||||
!**/src/main/**/build/
|
||||
!**/src/test/**/build/
|
||||
|
||||
### VS Code ###
|
||||
.vscode/
|
||||
19
scs-100/docker-compose.yml
Normal file
19
scs-100/docker-compose.yml
Normal file
@@ -0,0 +1,19 @@
|
||||
version: '3'
|
||||
services:
|
||||
kafka:
|
||||
image: wurstmeister/kafka
|
||||
container_name: kafka-mc
|
||||
ports:
|
||||
- "9092:9092"
|
||||
environment:
|
||||
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
|
||||
- KAFKA_ADVERTISED_PORT=9092
|
||||
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
|
||||
depends_on:
|
||||
- zookeeper
|
||||
zookeeper:
|
||||
image: wurstmeister/zookeeper
|
||||
ports:
|
||||
- "2181:2181"
|
||||
environment:
|
||||
- KAFKA_ADVERTISED_HOST_NAME=zookeeper
|
||||
105
scs-100/pom.xml
Normal file
105
scs-100/pom.xml
Normal file
@@ -0,0 +1,105 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-parent</artifactId>
|
||||
<version>2.4.5</version>
|
||||
<relativePath/> <!-- lookup parent from repository -->
|
||||
</parent>
|
||||
|
||||
<groupId>com.ehsaniara.scs_kafka_intro</groupId>
|
||||
<artifactId>scs-100</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>scs-100</name>
|
||||
<description>Demo project for Spring Boot</description>
|
||||
|
||||
<properties>
|
||||
<java.version>11</java.version>
|
||||
<spring-cloud.version>Hoxton.SR11</spring-cloud.version>
|
||||
<spring-boot.version>${project.parent.version}</spring-boot.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-configuration-processor</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream</artifactId>
|
||||
<scope>test</scope>
|
||||
<classifier>test-binder</classifier>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-dependencies</artifactId>
|
||||
<version>${spring-cloud.version}</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<version>${spring-boot.version}</version>
|
||||
<configuration>
|
||||
<excludes>
|
||||
<exclude>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,16 @@
|
||||
package com.ehsaniara.scs_kafka_intro.scs100;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
|
||||
//EnableBinding will be Deprecated as of 3.1 in favor of functional programming model, stay tuned for the next tutorials
|
||||
@EnableBinding(value = {PurchaseBinder.class})
|
||||
@SpringBootApplication
|
||||
public class Application {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(Application.class, args);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.ehsaniara.scs_kafka_intro.scs100;
|
||||
|
||||
import lombok.*;
|
||||
|
||||
import javax.validation.constraints.NotBlank;
|
||||
import java.util.UUID;
|
||||
|
||||
@ToString
|
||||
@Builder
|
||||
@Setter
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class Order {
|
||||
|
||||
private UUID orderUuid;
|
||||
|
||||
@NotBlank
|
||||
private String itemName;
|
||||
|
||||
private OrderStatus orderStatus;
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package com.ehsaniara.scs_kafka_intro.scs100;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.web.bind.annotation.ResponseStatus;
|
||||
|
||||
@ResponseStatus(value = HttpStatus.I_AM_A_TEAPOT)
|
||||
public class OrderFailedException extends RuntimeException {
|
||||
|
||||
public OrderFailedException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.ehsaniara.scs_kafka_intro.scs100;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.web.bind.annotation.ResponseStatus;
|
||||
|
||||
@ResponseStatus(value = HttpStatus.NOT_FOUND)
|
||||
public class OrderNotFoundException extends RuntimeException {
|
||||
public OrderNotFoundException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package com.ehsaniara.scs_kafka_intro.scs100;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
|
||||
@AllArgsConstructor
|
||||
public enum OrderStatus {
|
||||
PENDING("PENDING"),
|
||||
INVENTORY_CHECKING("INVENTORY_CHECKING"),
|
||||
INSUFFICIENT_INVENTORY("INSUFFICIENT_INVENTORY"),
|
||||
SHIPPED("SHIPPED"),
|
||||
CANCELED("CANCELED");
|
||||
|
||||
private final String name;
|
||||
|
||||
public String toString() {
|
||||
return this.name;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package com.ehsaniara.scs_kafka_intro.scs100;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import org.springframework.web.context.request.WebRequest;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.util.UUID;
|
||||
|
||||
@Slf4j
|
||||
@RestController
|
||||
@AllArgsConstructor
|
||||
public class ProducerController {
|
||||
|
||||
private final PurchaseService purchaseService;
|
||||
|
||||
@PostMapping("order")
|
||||
public Order placeOrder(@RequestBody @NotNull(message = "Invalid Order") Order order) {
|
||||
|
||||
return purchaseService.placeOrder(order);
|
||||
}
|
||||
|
||||
@GetMapping("order/status/{orderUuid}")
|
||||
public OrderStatus statusCheck(@PathVariable("orderUuid") UUID orderUuid) {
|
||||
|
||||
return purchaseService.statusCheck(orderUuid);
|
||||
}
|
||||
|
||||
@ControllerAdvice
|
||||
public static class RestResponseEntityExceptionHandler
|
||||
extends ResponseEntityExceptionHandler {
|
||||
|
||||
@ExceptionHandler({OrderFailedException.class})
|
||||
public ResponseEntity<Object> orderErrorException(
|
||||
Exception ex, WebRequest request) {
|
||||
return new ResponseEntity<Object>(
|
||||
"I_AM_A_TEAPOT", new HttpHeaders(), HttpStatus.I_AM_A_TEAPOT);
|
||||
}
|
||||
|
||||
@ExceptionHandler({OrderNotFoundException.class})
|
||||
public ResponseEntity<Object> orderNotFoundException(
|
||||
Exception ex, WebRequest request) {
|
||||
return new ResponseEntity<Object>(
|
||||
"NOT_FOUND", new HttpHeaders(), HttpStatus.NOT_FOUND);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.ehsaniara.scs_kafka_intro.scs100;
|
||||
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.Output;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
|
||||
public interface PurchaseBinder {
|
||||
|
||||
//spring.cloud.stream.bindings.inventoryChecking-in
|
||||
String INVENTORY_CHECKING_IN = "inventoryChecking-in";
|
||||
String INVENTORY_CHECKING_OUT = "inventoryChecking-out";
|
||||
|
||||
@Input(INVENTORY_CHECKING_IN)
|
||||
SubscribableChannel inventoryCheckingIn();
|
||||
|
||||
@Output(INVENTORY_CHECKING_OUT)
|
||||
MessageChannel inventoryCheckingOut();
|
||||
|
||||
////
|
||||
|
||||
String ORDER_DLQ = "order-dlq";
|
||||
|
||||
@Input(ORDER_DLQ)
|
||||
SubscribableChannel orderIn();
|
||||
|
||||
////
|
||||
|
||||
String SHIPPING_IN = "shipping-in";
|
||||
String SHIPPING_OUT = "shipping-out";
|
||||
|
||||
@Input(SHIPPING_IN)
|
||||
SubscribableChannel shippingIn();
|
||||
|
||||
@Output(SHIPPING_OUT)
|
||||
MessageChannel shippingOut();
|
||||
}
|
||||
@@ -0,0 +1,109 @@
|
||||
package com.ehsaniara.scs_kafka_intro.scs100;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.handler.annotation.Payload;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class PurchaseService {
|
||||
|
||||
private final PurchaseBinder purchaseBinder;
|
||||
|
||||
/**
|
||||
* this is a simulating your order dataBase (Single App Instance), We can replaced it with KStream in feature tutorials.
|
||||
*/
|
||||
Map<UUID, Order> orderDataBase = new HashMap<>();
|
||||
|
||||
public OrderStatus statusCheck(UUID orderUuid) {
|
||||
return Optional.ofNullable(orderDataBase)
|
||||
.map(c -> c.get(orderUuid))
|
||||
.orElseThrow(() -> new OrderNotFoundException("Order not found")).getOrderStatus();
|
||||
}
|
||||
|
||||
/**
|
||||
* here we placing the order into the topic and responding to the REST call immediately and not keep the thread busy much
|
||||
*/
|
||||
public Order placeOrder(Order orderIn) {
|
||||
log.debug("placeOrder orderIn: {}", orderIn);
|
||||
|
||||
var order = Order.builder()//
|
||||
.itemName(orderIn.getItemName())//
|
||||
.orderUuid(UUID.randomUUID())//
|
||||
.orderStatus(OrderStatus.PENDING)//
|
||||
.build();
|
||||
|
||||
//update the status
|
||||
orderDataBase.put(order.getOrderUuid(), order);
|
||||
|
||||
//send it for inventory check
|
||||
purchaseBinder.inventoryCheckingOut()//
|
||||
.send(MessageBuilder.withPayload(order)//
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)//
|
||||
.build());
|
||||
return order;
|
||||
}
|
||||
|
||||
/**
|
||||
* check inventory System for Item availability.
|
||||
* this is a third party service simulation and
|
||||
* let say it tacks around 5 seconds to check your inventory
|
||||
*/
|
||||
@StreamListener(PurchaseBinder.INVENTORY_CHECKING_IN)
|
||||
@SneakyThrows
|
||||
public void checkInventory(@Payload Order orderIn) {
|
||||
log.debug("checkInventory orderIn: {}", orderIn);
|
||||
orderIn.setOrderStatus(OrderStatus.INVENTORY_CHECKING);
|
||||
orderDataBase.put(orderIn.getOrderUuid(), orderIn);
|
||||
|
||||
Thread.sleep(5_000);//5 sec delay
|
||||
|
||||
// just a simulation of create exception for random orders (1 in 2) in case of inventory insufficiency
|
||||
if (System.currentTimeMillis() % 2 == 0) {
|
||||
orderIn.setOrderStatus(OrderStatus.INSUFFICIENT_INVENTORY);
|
||||
orderDataBase.put(orderIn.getOrderUuid(), orderIn);
|
||||
throw new OrderFailedException(String.format("insufficient inventory for order: %s", orderIn.getOrderUuid()));
|
||||
}
|
||||
|
||||
//Order is good to go for shipping
|
||||
purchaseBinder.shippingOut()//
|
||||
.send(MessageBuilder.withPayload(orderIn)//
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)//
|
||||
.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Order is shipped
|
||||
*/
|
||||
@StreamListener(PurchaseBinder.SHIPPING_IN)
|
||||
public void shipIt(@Payload Order orderIn) {
|
||||
log.debug("shipIt orderIn: {}", orderIn);
|
||||
orderIn.setOrderStatus(OrderStatus.SHIPPED);
|
||||
orderDataBase.put(orderIn.getOrderUuid(), orderIn);
|
||||
|
||||
log.info("ItemID: {} has been Shipped", orderIn.getOrderUuid());
|
||||
}
|
||||
|
||||
/**
|
||||
* this is eventually a DLQ,
|
||||
* for a general purpose
|
||||
*/
|
||||
@StreamListener(PurchaseBinder.ORDER_DLQ)
|
||||
public void cancelOrder(@Payload Order orderIn) {
|
||||
log.warn("cancelOrder orderIn: {}", orderIn);
|
||||
orderIn.setOrderStatus(OrderStatus.CANCELED);
|
||||
orderDataBase.put(orderIn.getOrderUuid(), orderIn);
|
||||
}
|
||||
}
|
||||
43
scs-100/src/main/resources/application.yml
Normal file
43
scs-100/src/main/resources/application.yml
Normal file
@@ -0,0 +1,43 @@
|
||||
spring:
|
||||
application:
|
||||
name: scs-100
|
||||
|
||||
cloud.stream:
|
||||
bindings:
|
||||
##
|
||||
inventoryChecking-out:
|
||||
destination: scs-100.inventoryChecking
|
||||
inventoryChecking-in:
|
||||
destination: scs-100.inventoryChecking
|
||||
group: ${spring.application.name}-inventoryChecking-group
|
||||
consumer:
|
||||
maxAttempts: 1
|
||||
|
||||
order-dlq:
|
||||
destination: scs-100.ordering_dlq
|
||||
|
||||
shipping-out:
|
||||
destination: scs-100.shipping
|
||||
shipping-in:
|
||||
destination: scs-100.shipping
|
||||
group: ${spring.application.name}-shipping-group
|
||||
|
||||
kafka:
|
||||
bindings:
|
||||
# If Inventory Checking fails
|
||||
inventoryChecking-in.consumer:
|
||||
enableDlq: true
|
||||
dlqName: scs-100.ordering_dlq
|
||||
autoCommitOnError: true
|
||||
AutoCommitOffset: true
|
||||
|
||||
# If shipping fails
|
||||
shipping-in.consumer:
|
||||
enableDlq: true
|
||||
dlqName: scs-100.ordering_dlq
|
||||
autoCommitOnError: true
|
||||
AutoCommitOffset: true
|
||||
|
||||
logging:
|
||||
level:
|
||||
com.ehsaniara.scs_kafka_intro: debug
|
||||
Reference in New Issue
Block a user