From 773009658f3fdb9e6bc5e18a8ddd82051972c88e Mon Sep 17 00:00:00 2001 From: Jay Ehsaniara Date: Sun, 16 May 2021 19:02:05 -0700 Subject: [PATCH] updating scs-099 tutorial --- docker-compose.yml | 20 +++++ scs-099/.gitignore | 33 +++++++++ scs-099/docker-compose.yml | 19 +++++ scs-099/pom.xml | 72 ++++++++++++++++++ .../scs_kafka_intro/scs099/Application.java | 74 +++++++++++++++++++ .../scs_kafka_intro/scs099/MyBinder.java | 19 +++++ scs-099/src/main/resources/application.yml | 40 ++++++++++ scs-100/README.MD | 4 +- scs-100/pom.xml | 11 --- .../scs100/OrderController.java | 29 -------- .../scs_kafka_intro/scs100/OrderService.java | 3 +- scs-100/src/main/resources/application.yml | 2 +- 12 files changed, 283 insertions(+), 43 deletions(-) create mode 100644 docker-compose.yml create mode 100644 scs-099/.gitignore create mode 100644 scs-099/docker-compose.yml create mode 100644 scs-099/pom.xml create mode 100644 scs-099/src/main/java/com/ehsaniara/scs_kafka_intro/scs099/Application.java create mode 100644 scs-099/src/main/java/com/ehsaniara/scs_kafka_intro/scs099/MyBinder.java create mode 100644 scs-099/src/main/resources/application.yml diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..9985d29 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,20 @@ +version: '3' +services: + kafka: + image: wurstmeister/kafka + container_name: kafka-scs-kafka-intro + ports: + - "9092:9092" + environment: + - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 + - KAFKA_ADVERTISED_PORT=9092 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + depends_on: + - zookeeper + zookeeper: + image: wurstmeister/zookeeper + container_name: zookeeper-scs-kafka-intro + ports: + - "2181:2181" + environment: + - KAFKA_ADVERTISED_HOST_NAME=zookeeper diff --git a/scs-099/.gitignore b/scs-099/.gitignore new file mode 100644 index 0000000..549e00a --- /dev/null +++ b/scs-099/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/scs-099/docker-compose.yml b/scs-099/docker-compose.yml new file mode 100644 index 0000000..347e011 --- /dev/null +++ b/scs-099/docker-compose.yml @@ -0,0 +1,19 @@ +version: '3' +services: + kafka: + image: wurstmeister/kafka + container_name: kafka-mc + ports: + - "9092:9092" + environment: + - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 + - KAFKA_ADVERTISED_PORT=9092 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + depends_on: + - zookeeper + zookeeper: + image: wurstmeister/zookeeper + ports: + - "2181:2181" + environment: + - KAFKA_ADVERTISED_HOST_NAME=zookeeper diff --git a/scs-099/pom.xml b/scs-099/pom.xml new file mode 100644 index 0000000..4d95df5 --- /dev/null +++ b/scs-099/pom.xml @@ -0,0 +1,72 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.4.5 + + + + com.ehsaniara.scs_kafka_intro + scs-099 + 0.0.1-SNAPSHOT + scs-099 + Demo project for Spring Boot + + + 11 + Hoxton.SR11 + ${project.parent.version} + + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.cloud + spring-cloud-stream-binder-kafka + + + + org.projectlombok + lombok + true + + + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.projectlombok + lombok + + + + + + + + diff --git a/scs-099/src/main/java/com/ehsaniara/scs_kafka_intro/scs099/Application.java b/scs-099/src/main/java/com/ehsaniara/scs_kafka_intro/scs099/Application.java new file mode 100644 index 0000000..2e3b755 --- /dev/null +++ b/scs-099/src/main/java/com/ehsaniara/scs_kafka_intro/scs099/Application.java @@ -0,0 +1,74 @@ +package com.ehsaniara.scs_kafka_intro.scs099; + +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.springframework.util.MimeTypeUtils; + +import java.util.stream.IntStream; + +@EnableScheduling +@EnableBinding(value = {MyBinder.class}) +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + + @Slf4j + @Service + @RequiredArgsConstructor + public static class PobSub { + + private final MyBinder myBinder; + + @Value("${server.port:8080}") + private int port; + + //counter for every Scheduled attempt + private int counter; + + @Scheduled(initialDelay = 5_000, fixedDelay = 5_000) + public void producer() { + //the producer works just for the app that runs at port 8080 + if (port == 8080) { + // 10 iterative loop + IntStream.range(0, 10) + .forEach(value -> { + //this is our Message payload + String message = String.format("TestString of %s - %s", counter, value); + + //here is out message publisher in the given channel into topic "scs-099.order" + myBinder.orderOut() + .send(MessageBuilder.withPayload(message) + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) + .build()); + //to show what we have produced in kafka ("warn" to show in different color on the console) + log.warn("message produced: {}", message); + }); + counter++; + } + } + + @SneakyThrows + @StreamListener(MyBinder.ORDER_IN) + public void consumer(@Payload String message) { + //simulate 200ms delay when consumer is working in some task + Thread.sleep(200); + log.debug("message consumed: {}", message); + } + } + +} diff --git a/scs-099/src/main/java/com/ehsaniara/scs_kafka_intro/scs099/MyBinder.java b/scs-099/src/main/java/com/ehsaniara/scs_kafka_intro/scs099/MyBinder.java new file mode 100644 index 0000000..d834eed --- /dev/null +++ b/scs-099/src/main/java/com/ehsaniara/scs_kafka_intro/scs099/MyBinder.java @@ -0,0 +1,19 @@ +package com.ehsaniara.scs_kafka_intro.scs099; + +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.SubscribableChannel; + +public interface MyBinder { + + // channels + String ORDER_IN = "order-in"; + String ORDER_OUT = "order-out"; + + @Input(ORDER_IN) + SubscribableChannel orderIn(); + + @Output(ORDER_OUT) + MessageChannel orderOut(); +} diff --git a/scs-099/src/main/resources/application.yml b/scs-099/src/main/resources/application.yml new file mode 100644 index 0000000..8d9ce46 --- /dev/null +++ b/scs-099/src/main/resources/application.yml @@ -0,0 +1,40 @@ +spring: + application: + name: scs-099 + + cloud.stream.bindings: + order-out.destination: scs-099.order # Topic Name + order-in.destination: scs-099.order # Topic Name + +# To see the DEBUG level logs in console +logging.level.com.ehsaniara.scs_kafka_intro: debug +--- +spring: + config: + activate: + on-profile: "test2" + + cloud.stream.bindings: + order-out.producer.partition-count: 10 + order-in: + group: ${spring.application.name}-shipping-group + consumer.concurrency: 1 + + cloud.stream.kafka.binder: + autoAddPartitions: true +--- +spring: + config: + activate: + on-profile: "test3" + + cloud.stream.bindings: + order-out.producer.partition-count: 10 + order-in: + group: ${spring.application.name}-shipping-group + consumer.concurrency: 3 + + cloud.stream.kafka.binder: + autoAddPartitions: true + + diff --git a/scs-100/README.MD b/scs-100/README.MD index f97578c..ac09a0e 100644 --- a/scs-100/README.MD +++ b/scs-100/README.MD @@ -1,6 +1,8 @@ # SCS-100 -A basic Example of an Event Driven Flow by the help of **SPRING CLOUD STREAM KAFKA** +## PubSub Mechanism + +A simple Example of an Event Driven Flow by the help of **SPRING CLOUD STREAM KAFKA** ##### properties diff --git a/scs-100/pom.xml b/scs-100/pom.xml index 38dfb43..029902d 100644 --- a/scs-100/pom.xml +++ b/scs-100/pom.xml @@ -34,22 +34,11 @@ spring-boot-starter-web - - org.springframework.cloud - spring-cloud-stream - - org.springframework.cloud spring-cloud-stream-binder-kafka - - org.springframework.boot - spring-boot-configuration-processor - true - - org.projectlombok lombok diff --git a/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderController.java b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderController.java index f40f824..24dea7b 100644 --- a/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderController.java +++ b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderController.java @@ -1,18 +1,11 @@ package com.ehsaniara.scs_kafka_intro.scs100; import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; -import org.springframework.web.context.request.WebRequest; -import org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler; import javax.validation.constraints.NotNull; import java.util.UUID; -@Slf4j @RestController @AllArgsConstructor public class OrderController { @@ -21,33 +14,11 @@ public class OrderController { @PostMapping("order") public Order placeOrder(@RequestBody @NotNull(message = "Invalid Order") Order order) { - return orderService.placeOrder(order); } @GetMapping("order/status/{orderUuid}") public OrderStatus statusCheck(@PathVariable("orderUuid") UUID orderUuid) { - return orderService.statusCheck(orderUuid); } - - @ControllerAdvice - public static class RestResponseEntityExceptionHandler - extends ResponseEntityExceptionHandler { - - @ExceptionHandler({OrderFailedException.class}) - public ResponseEntity orderErrorException( - Exception ex, WebRequest request) { - return new ResponseEntity( - "I_AM_A_TEAPOT", new HttpHeaders(), HttpStatus.I_AM_A_TEAPOT); - } - - @ExceptionHandler({OrderNotFoundException.class}) - public ResponseEntity orderNotFoundException( - Exception ex, WebRequest request) { - return new ResponseEntity( - "NOT_FOUND", new HttpHeaders(), HttpStatus.NOT_FOUND); - } - - } } diff --git a/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderService.java b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderService.java index 34d9c4d..bb5abd7 100644 --- a/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderService.java +++ b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderService.java @@ -59,7 +59,7 @@ public class OrderService { } /** - * check inventory System for Item availability. + * checking inventory System for Item availability. * this is a third party service simulation and * let say it tacks around 5 seconds to check your inventory */ @@ -76,6 +76,7 @@ public class OrderService { if (System.currentTimeMillis() % 2 == 0) { orderIn.setOrderStatus(OrderStatus.INSUFFICIENT_INVENTORY); orderDataBase.put(orderIn.getOrderUuid(), orderIn); + log.warn("Let's assume we ran out of stock for item: {}", orderIn.getItemName()); throw new OrderFailedException(String.format("insufficient inventory for order: %s", orderIn.getOrderUuid())); } diff --git a/scs-100/src/main/resources/application.yml b/scs-100/src/main/resources/application.yml index 731f1ed..dde1a0c 100644 --- a/scs-100/src/main/resources/application.yml +++ b/scs-100/src/main/resources/application.yml @@ -11,7 +11,7 @@ spring: destination: scs-100.inventoryChecking group: ${spring.application.name}-inventoryChecking-group consumer: - maxAttempts: 1 + maxAttempts: 1 # this example for simulating out of stock so there is no point of retrying after it failed in the first attempt order-dlq: destination: scs-100.ordering_dlq