diff --git a/scs-100/.gitignore b/scs-100/.gitignore new file mode 100644 index 0000000..549e00a --- /dev/null +++ b/scs-100/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/scs-100/docker-compose.yml b/scs-100/docker-compose.yml new file mode 100644 index 0000000..347e011 --- /dev/null +++ b/scs-100/docker-compose.yml @@ -0,0 +1,19 @@ +version: '3' +services: + kafka: + image: wurstmeister/kafka + container_name: kafka-mc + ports: + - "9092:9092" + environment: + - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 + - KAFKA_ADVERTISED_PORT=9092 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + depends_on: + - zookeeper + zookeeper: + image: wurstmeister/zookeeper + ports: + - "2181:2181" + environment: + - KAFKA_ADVERTISED_HOST_NAME=zookeeper diff --git a/scs-100/pom.xml b/scs-100/pom.xml new file mode 100644 index 0000000..38dfb43 --- /dev/null +++ b/scs-100/pom.xml @@ -0,0 +1,105 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.4.5 + + + + com.ehsaniara.scs_kafka_intro + scs-100 + 0.0.1-SNAPSHOT + scs-100 + Demo project for Spring Boot + + + 11 + Hoxton.SR11 + ${project.parent.version} + + + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.cloud + spring-cloud-stream + + + + org.springframework.cloud + spring-cloud-stream-binder-kafka + + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + org.projectlombok + lombok + true + + + + org.springframework.boot + spring-boot-starter-test + test + + + + org.springframework.cloud + spring-cloud-stream + test + test-binder + test-jar + + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + org.projectlombok + lombok + + + + + + + + + diff --git a/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/Application.java b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/Application.java new file mode 100644 index 0000000..2da929c --- /dev/null +++ b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/Application.java @@ -0,0 +1,16 @@ +package com.ehsaniara.scs_kafka_intro.scs100; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; + +//EnableBinding will be Deprecated as of 3.1 in favor of functional programming model, stay tuned for the next tutorials +@EnableBinding(value = {PurchaseBinder.class}) +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + +} diff --git a/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/Order.java b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/Order.java new file mode 100644 index 0000000..684c885 --- /dev/null +++ b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/Order.java @@ -0,0 +1,22 @@ +package com.ehsaniara.scs_kafka_intro.scs100; + +import lombok.*; + +import javax.validation.constraints.NotBlank; +import java.util.UUID; + +@ToString +@Builder +@Setter +@Getter +@AllArgsConstructor +@NoArgsConstructor +public class Order { + + private UUID orderUuid; + + @NotBlank + private String itemName; + + private OrderStatus orderStatus; +} diff --git a/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderFailedException.java b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderFailedException.java new file mode 100644 index 0000000..006cf1a --- /dev/null +++ b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderFailedException.java @@ -0,0 +1,12 @@ +package com.ehsaniara.scs_kafka_intro.scs100; + +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ResponseStatus; + +@ResponseStatus(value = HttpStatus.I_AM_A_TEAPOT) +public class OrderFailedException extends RuntimeException { + + public OrderFailedException(String msg) { + super(msg); + } +} diff --git a/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderNotFoundException.java b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderNotFoundException.java new file mode 100644 index 0000000..87d4f2c --- /dev/null +++ b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderNotFoundException.java @@ -0,0 +1,11 @@ +package com.ehsaniara.scs_kafka_intro.scs100; + +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ResponseStatus; + +@ResponseStatus(value = HttpStatus.NOT_FOUND) +public class OrderNotFoundException extends RuntimeException { + public OrderNotFoundException(String msg) { + super(msg); + } +} diff --git a/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderStatus.java b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderStatus.java new file mode 100644 index 0000000..46d2c54 --- /dev/null +++ b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderStatus.java @@ -0,0 +1,18 @@ +package com.ehsaniara.scs_kafka_intro.scs100; + +import lombok.AllArgsConstructor; + +@AllArgsConstructor +public enum OrderStatus { + PENDING("PENDING"), + INVENTORY_CHECKING("INVENTORY_CHECKING"), + INSUFFICIENT_INVENTORY("INSUFFICIENT_INVENTORY"), + SHIPPED("SHIPPED"), + CANCELED("CANCELED"); + + private final String name; + + public String toString() { + return this.name; + } +} diff --git a/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/ProducerController.java b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/ProducerController.java new file mode 100644 index 0000000..219254a --- /dev/null +++ b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/ProducerController.java @@ -0,0 +1,53 @@ +package com.ehsaniara.scs_kafka_intro.scs100; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.WebRequest; +import org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler; + +import javax.validation.constraints.NotNull; +import java.util.UUID; + +@Slf4j +@RestController +@AllArgsConstructor +public class ProducerController { + + private final PurchaseService purchaseService; + + @PostMapping("order") + public Order placeOrder(@RequestBody @NotNull(message = "Invalid Order") Order order) { + + return purchaseService.placeOrder(order); + } + + @GetMapping("order/status/{orderUuid}") + public OrderStatus statusCheck(@PathVariable("orderUuid") UUID orderUuid) { + + return purchaseService.statusCheck(orderUuid); + } + + @ControllerAdvice + public static class RestResponseEntityExceptionHandler + extends ResponseEntityExceptionHandler { + + @ExceptionHandler({OrderFailedException.class}) + public ResponseEntity orderErrorException( + Exception ex, WebRequest request) { + return new ResponseEntity( + "I_AM_A_TEAPOT", new HttpHeaders(), HttpStatus.I_AM_A_TEAPOT); + } + + @ExceptionHandler({OrderNotFoundException.class}) + public ResponseEntity orderNotFoundException( + Exception ex, WebRequest request) { + return new ResponseEntity( + "NOT_FOUND", new HttpHeaders(), HttpStatus.NOT_FOUND); + } + + } +} diff --git a/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/PurchaseBinder.java b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/PurchaseBinder.java new file mode 100644 index 0000000..324645e --- /dev/null +++ b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/PurchaseBinder.java @@ -0,0 +1,37 @@ +package com.ehsaniara.scs_kafka_intro.scs100; + +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.SubscribableChannel; + +public interface PurchaseBinder { + + //spring.cloud.stream.bindings.inventoryChecking-in + String INVENTORY_CHECKING_IN = "inventoryChecking-in"; + String INVENTORY_CHECKING_OUT = "inventoryChecking-out"; + + @Input(INVENTORY_CHECKING_IN) + SubscribableChannel inventoryCheckingIn(); + + @Output(INVENTORY_CHECKING_OUT) + MessageChannel inventoryCheckingOut(); + + //// + + String ORDER_DLQ = "order-dlq"; + + @Input(ORDER_DLQ) + SubscribableChannel orderIn(); + + //// + + String SHIPPING_IN = "shipping-in"; + String SHIPPING_OUT = "shipping-out"; + + @Input(SHIPPING_IN) + SubscribableChannel shippingIn(); + + @Output(SHIPPING_OUT) + MessageChannel shippingOut(); +} diff --git a/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/PurchaseService.java b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/PurchaseService.java new file mode 100644 index 0000000..5efba7c --- /dev/null +++ b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/PurchaseService.java @@ -0,0 +1,109 @@ +package com.ehsaniara.scs_kafka_intro.scs100; + +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; +import org.springframework.util.MimeTypeUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +@Slf4j +@Service +@RequiredArgsConstructor +public class PurchaseService { + + private final PurchaseBinder purchaseBinder; + + /** + * this is a simulating your order dataBase (Single App Instance), We can replaced it with KStream in feature tutorials. + */ + Map orderDataBase = new HashMap<>(); + + public OrderStatus statusCheck(UUID orderUuid) { + return Optional.ofNullable(orderDataBase) + .map(c -> c.get(orderUuid)) + .orElseThrow(() -> new OrderNotFoundException("Order not found")).getOrderStatus(); + } + + /** + * here we placing the order into the topic and responding to the REST call immediately and not keep the thread busy much + */ + public Order placeOrder(Order orderIn) { + log.debug("placeOrder orderIn: {}", orderIn); + + var order = Order.builder()// + .itemName(orderIn.getItemName())// + .orderUuid(UUID.randomUUID())// + .orderStatus(OrderStatus.PENDING)// + .build(); + + //update the status + orderDataBase.put(order.getOrderUuid(), order); + + //send it for inventory check + purchaseBinder.inventoryCheckingOut()// + .send(MessageBuilder.withPayload(order)// + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)// + .build()); + return order; + } + + /** + * check inventory System for Item availability. + * this is a third party service simulation and + * let say it tacks around 5 seconds to check your inventory + */ + @StreamListener(PurchaseBinder.INVENTORY_CHECKING_IN) + @SneakyThrows + public void checkInventory(@Payload Order orderIn) { + log.debug("checkInventory orderIn: {}", orderIn); + orderIn.setOrderStatus(OrderStatus.INVENTORY_CHECKING); + orderDataBase.put(orderIn.getOrderUuid(), orderIn); + + Thread.sleep(5_000);//5 sec delay + + // just a simulation of create exception for random orders (1 in 2) in case of inventory insufficiency + if (System.currentTimeMillis() % 2 == 0) { + orderIn.setOrderStatus(OrderStatus.INSUFFICIENT_INVENTORY); + orderDataBase.put(orderIn.getOrderUuid(), orderIn); + throw new OrderFailedException(String.format("insufficient inventory for order: %s", orderIn.getOrderUuid())); + } + + //Order is good to go for shipping + purchaseBinder.shippingOut()// + .send(MessageBuilder.withPayload(orderIn)// + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)// + .build()); + } + + /** + * Order is shipped + */ + @StreamListener(PurchaseBinder.SHIPPING_IN) + public void shipIt(@Payload Order orderIn) { + log.debug("shipIt orderIn: {}", orderIn); + orderIn.setOrderStatus(OrderStatus.SHIPPED); + orderDataBase.put(orderIn.getOrderUuid(), orderIn); + + log.info("ItemID: {} has been Shipped", orderIn.getOrderUuid()); + } + + /** + * this is eventually a DLQ, + * for a general purpose + */ + @StreamListener(PurchaseBinder.ORDER_DLQ) + public void cancelOrder(@Payload Order orderIn) { + log.warn("cancelOrder orderIn: {}", orderIn); + orderIn.setOrderStatus(OrderStatus.CANCELED); + orderDataBase.put(orderIn.getOrderUuid(), orderIn); + } +} diff --git a/scs-100/src/main/resources/application.yml b/scs-100/src/main/resources/application.yml new file mode 100644 index 0000000..731f1ed --- /dev/null +++ b/scs-100/src/main/resources/application.yml @@ -0,0 +1,43 @@ +spring: + application: + name: scs-100 + + cloud.stream: + bindings: + ## + inventoryChecking-out: + destination: scs-100.inventoryChecking + inventoryChecking-in: + destination: scs-100.inventoryChecking + group: ${spring.application.name}-inventoryChecking-group + consumer: + maxAttempts: 1 + + order-dlq: + destination: scs-100.ordering_dlq + + shipping-out: + destination: scs-100.shipping + shipping-in: + destination: scs-100.shipping + group: ${spring.application.name}-shipping-group + + kafka: + bindings: + # If Inventory Checking fails + inventoryChecking-in.consumer: + enableDlq: true + dlqName: scs-100.ordering_dlq + autoCommitOnError: true + AutoCommitOffset: true + + # If shipping fails + shipping-in.consumer: + enableDlq: true + dlqName: scs-100.ordering_dlq + autoCommitOnError: true + AutoCommitOffset: true + +logging: + level: + com.ehsaniara.scs_kafka_intro: debug