diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..9985d29
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,20 @@
+version: '3'
+services:
+ kafka:
+ image: wurstmeister/kafka
+ container_name: kafka-scs-kafka-intro
+ ports:
+ - "9092:9092"
+ environment:
+ - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
+ - KAFKA_ADVERTISED_PORT=9092
+ - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
+ depends_on:
+ - zookeeper
+ zookeeper:
+ image: wurstmeister/zookeeper
+ container_name: zookeeper-scs-kafka-intro
+ ports:
+ - "2181:2181"
+ environment:
+ - KAFKA_ADVERTISED_HOST_NAME=zookeeper
diff --git a/scs-099/.gitignore b/scs-099/.gitignore
new file mode 100644
index 0000000..549e00a
--- /dev/null
+++ b/scs-099/.gitignore
@@ -0,0 +1,33 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
diff --git a/scs-099/docker-compose.yml b/scs-099/docker-compose.yml
new file mode 100644
index 0000000..347e011
--- /dev/null
+++ b/scs-099/docker-compose.yml
@@ -0,0 +1,19 @@
+version: '3'
+services:
+ kafka:
+ image: wurstmeister/kafka
+ container_name: kafka-mc
+ ports:
+ - "9092:9092"
+ environment:
+ - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
+ - KAFKA_ADVERTISED_PORT=9092
+ - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
+ depends_on:
+ - zookeeper
+ zookeeper:
+ image: wurstmeister/zookeeper
+ ports:
+ - "2181:2181"
+ environment:
+ - KAFKA_ADVERTISED_HOST_NAME=zookeeper
diff --git a/scs-099/pom.xml b/scs-099/pom.xml
new file mode 100644
index 0000000..4d95df5
--- /dev/null
+++ b/scs-099/pom.xml
@@ -0,0 +1,72 @@
+
+
+ 4.0.0
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.4.5
+
+
+
+ com.ehsaniara.scs_kafka_intro
+ scs-099
+ 0.0.1-SNAPSHOT
+ scs-099
+ Demo project for Spring Boot
+
+
+ 11
+ Hoxton.SR11
+ ${project.parent.version}
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream-binder-kafka
+
+
+
+ org.projectlombok
+ lombok
+ true
+
+
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+
+
+
+
+
diff --git a/scs-099/src/main/java/com/ehsaniara/scs_kafka_intro/scs099/Application.java b/scs-099/src/main/java/com/ehsaniara/scs_kafka_intro/scs099/Application.java
new file mode 100644
index 0000000..2e3b755
--- /dev/null
+++ b/scs-099/src/main/java/com/ehsaniara/scs_kafka_intro/scs099/Application.java
@@ -0,0 +1,74 @@
+package com.ehsaniara.scs_kafka_intro.scs099;
+
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.integration.support.MessageBuilder;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+import org.springframework.util.MimeTypeUtils;
+
+import java.util.stream.IntStream;
+
+@EnableScheduling
+@EnableBinding(value = {MyBinder.class})
+@SpringBootApplication
+public class Application {
+
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class, args);
+ }
+
+ @Slf4j
+ @Service
+ @RequiredArgsConstructor
+ public static class PobSub {
+
+ private final MyBinder myBinder;
+
+ @Value("${server.port:8080}")
+ private int port;
+
+ //counter for every Scheduled attempt
+ private int counter;
+
+ @Scheduled(initialDelay = 5_000, fixedDelay = 5_000)
+ public void producer() {
+ //the producer works just for the app that runs at port 8080
+ if (port == 8080) {
+ // 10 iterative loop
+ IntStream.range(0, 10)
+ .forEach(value -> {
+ //this is our Message payload
+ String message = String.format("TestString of %s - %s", counter, value);
+
+ //here is out message publisher in the given channel into topic "scs-099.order"
+ myBinder.orderOut()
+ .send(MessageBuilder.withPayload(message)
+ .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
+ .build());
+ //to show what we have produced in kafka ("warn" to show in different color on the console)
+ log.warn("message produced: {}", message);
+ });
+ counter++;
+ }
+ }
+
+ @SneakyThrows
+ @StreamListener(MyBinder.ORDER_IN)
+ public void consumer(@Payload String message) {
+ //simulate 200ms delay when consumer is working in some task
+ Thread.sleep(200);
+ log.debug("message consumed: {}", message);
+ }
+ }
+
+}
diff --git a/scs-099/src/main/java/com/ehsaniara/scs_kafka_intro/scs099/MyBinder.java b/scs-099/src/main/java/com/ehsaniara/scs_kafka_intro/scs099/MyBinder.java
new file mode 100644
index 0000000..d834eed
--- /dev/null
+++ b/scs-099/src/main/java/com/ehsaniara/scs_kafka_intro/scs099/MyBinder.java
@@ -0,0 +1,19 @@
+package com.ehsaniara.scs_kafka_intro.scs099;
+
+import org.springframework.cloud.stream.annotation.Input;
+import org.springframework.cloud.stream.annotation.Output;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.SubscribableChannel;
+
+public interface MyBinder {
+
+ // channels
+ String ORDER_IN = "order-in";
+ String ORDER_OUT = "order-out";
+
+ @Input(ORDER_IN)
+ SubscribableChannel orderIn();
+
+ @Output(ORDER_OUT)
+ MessageChannel orderOut();
+}
diff --git a/scs-099/src/main/resources/application.yml b/scs-099/src/main/resources/application.yml
new file mode 100644
index 0000000..8d9ce46
--- /dev/null
+++ b/scs-099/src/main/resources/application.yml
@@ -0,0 +1,40 @@
+spring:
+ application:
+ name: scs-099
+
+ cloud.stream.bindings:
+ order-out.destination: scs-099.order # Topic Name
+ order-in.destination: scs-099.order # Topic Name
+
+# To see the DEBUG level logs in console
+logging.level.com.ehsaniara.scs_kafka_intro: debug
+---
+spring:
+ config:
+ activate:
+ on-profile: "test2"
+
+ cloud.stream.bindings:
+ order-out.producer.partition-count: 10
+ order-in:
+ group: ${spring.application.name}-shipping-group
+ consumer.concurrency: 1
+
+ cloud.stream.kafka.binder:
+ autoAddPartitions: true
+---
+spring:
+ config:
+ activate:
+ on-profile: "test3"
+
+ cloud.stream.bindings:
+ order-out.producer.partition-count: 10
+ order-in:
+ group: ${spring.application.name}-shipping-group
+ consumer.concurrency: 3
+
+ cloud.stream.kafka.binder:
+ autoAddPartitions: true
+
+
diff --git a/scs-100/README.MD b/scs-100/README.MD
index f97578c..ac09a0e 100644
--- a/scs-100/README.MD
+++ b/scs-100/README.MD
@@ -1,6 +1,8 @@
# SCS-100
-A basic Example of an Event Driven Flow by the help of **SPRING CLOUD STREAM KAFKA**
+## PubSub Mechanism
+
+A simple Example of an Event Driven Flow by the help of **SPRING CLOUD STREAM KAFKA**
##### properties
diff --git a/scs-100/pom.xml b/scs-100/pom.xml
index 38dfb43..029902d 100644
--- a/scs-100/pom.xml
+++ b/scs-100/pom.xml
@@ -34,22 +34,11 @@
spring-boot-starter-web
-
- org.springframework.cloud
- spring-cloud-stream
-
-
org.springframework.cloud
spring-cloud-stream-binder-kafka
-
- org.springframework.boot
- spring-boot-configuration-processor
- true
-
-
org.projectlombok
lombok
diff --git a/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderController.java b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderController.java
index f40f824..24dea7b 100644
--- a/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderController.java
+++ b/scs-100/src/main/java/com/ehsaniara/scs_kafka_intro/scs100/OrderController.java
@@ -1,18 +1,11 @@
package com.ehsaniara.scs_kafka_intro.scs100;
import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
-import org.springframework.web.context.request.WebRequest;
-import org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler;
import javax.validation.constraints.NotNull;
import java.util.UUID;
-@Slf4j
@RestController
@AllArgsConstructor
public class OrderController {
@@ -21,33 +14,11 @@ public class OrderController {
@PostMapping("order")
public Order placeOrder(@RequestBody @NotNull(message = "Invalid Order") Order order) {
-
return orderService.placeOrder(order);
}
@GetMapping("order/status/{orderUuid}")
public OrderStatus statusCheck(@PathVariable("orderUuid") UUID orderUuid) {
-
return orderService.statusCheck(orderUuid);
}
-
- @ControllerAdvice
- public static class RestResponseEntityExceptionHandler
- extends ResponseEntityExceptionHandler {
-
- @ExceptionHandler({OrderFailedException.class})
- public ResponseEntity