updating scs-099 tutorial

This commit is contained in:
Jay Ehsaniara
2021-05-16 19:02:05 -07:00
parent 3e66892087
commit 773009658f
12 changed files with 283 additions and 43 deletions

20
docker-compose.yml Normal file
View File

@@ -0,0 +1,20 @@
version: '3'
services:
kafka:
image: wurstmeister/kafka
container_name: kafka-scs-kafka-intro
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
depends_on:
- zookeeper
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper-scs-kafka-intro
ports:
- "2181:2181"
environment:
- KAFKA_ADVERTISED_HOST_NAME=zookeeper

33
scs-099/.gitignore vendored Normal file
View File

@@ -0,0 +1,33 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/

View File

@@ -0,0 +1,19 @@
version: '3'
services:
kafka:
image: wurstmeister/kafka
container_name: kafka-mc
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
depends_on:
- zookeeper
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
environment:
- KAFKA_ADVERTISED_HOST_NAME=zookeeper

72
scs-099/pom.xml Normal file
View File

@@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.ehsaniara.scs_kafka_intro</groupId>
<artifactId>scs-099</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>scs-099</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
<spring-cloud.version>Hoxton.SR11</spring-cloud.version>
<spring-boot.version>${project.parent.version}</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,74 @@
package com.ehsaniara.scs_kafka_intro.scs099;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;
import java.util.stream.IntStream;
@EnableScheduling
@EnableBinding(value = {MyBinder.class})
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Slf4j
@Service
@RequiredArgsConstructor
public static class PobSub {
private final MyBinder myBinder;
@Value("${server.port:8080}")
private int port;
//counter for every Scheduled attempt
private int counter;
@Scheduled(initialDelay = 5_000, fixedDelay = 5_000)
public void producer() {
//the producer works just for the app that runs at port 8080
if (port == 8080) {
// 10 iterative loop
IntStream.range(0, 10)
.forEach(value -> {
//this is our Message payload
String message = String.format("TestString of %s - %s", counter, value);
//here is out message publisher in the given channel into topic "scs-099.order"
myBinder.orderOut()
.send(MessageBuilder.withPayload(message)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
//to show what we have produced in kafka ("warn" to show in different color on the console)
log.warn("message produced: {}", message);
});
counter++;
}
}
@SneakyThrows
@StreamListener(MyBinder.ORDER_IN)
public void consumer(@Payload String message) {
//simulate 200ms delay when consumer is working in some task
Thread.sleep(200);
log.debug("message consumed: {}", message);
}
}
}

View File

@@ -0,0 +1,19 @@
package com.ehsaniara.scs_kafka_intro.scs099;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface MyBinder {
// channels
String ORDER_IN = "order-in";
String ORDER_OUT = "order-out";
@Input(ORDER_IN)
SubscribableChannel orderIn();
@Output(ORDER_OUT)
MessageChannel orderOut();
}

View File

@@ -0,0 +1,40 @@
spring:
application:
name: scs-099
cloud.stream.bindings:
order-out.destination: scs-099.order # Topic Name
order-in.destination: scs-099.order # Topic Name
# To see the DEBUG level logs in console
logging.level.com.ehsaniara.scs_kafka_intro: debug
---
spring:
config:
activate:
on-profile: "test2"
cloud.stream.bindings:
order-out.producer.partition-count: 10
order-in:
group: ${spring.application.name}-shipping-group
consumer.concurrency: 1
cloud.stream.kafka.binder:
autoAddPartitions: true
---
spring:
config:
activate:
on-profile: "test3"
cloud.stream.bindings:
order-out.producer.partition-count: 10
order-in:
group: ${spring.application.name}-shipping-group
consumer.concurrency: 3
cloud.stream.kafka.binder:
autoAddPartitions: true

View File

@@ -1,6 +1,8 @@
# SCS-100
A basic Example of an Event Driven Flow by the help of **SPRING CLOUD STREAM KAFKA**
## PubSub Mechanism
A simple Example of an Event Driven Flow by the help of **SPRING CLOUD STREAM KAFKA**
##### properties

View File

@@ -34,22 +34,11 @@
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>

View File

@@ -1,18 +1,11 @@
package com.ehsaniara.scs_kafka_intro.scs100;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.WebRequest;
import org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler;
import javax.validation.constraints.NotNull;
import java.util.UUID;
@Slf4j
@RestController
@AllArgsConstructor
public class OrderController {
@@ -21,33 +14,11 @@ public class OrderController {
@PostMapping("order")
public Order placeOrder(@RequestBody @NotNull(message = "Invalid Order") Order order) {
return orderService.placeOrder(order);
}
@GetMapping("order/status/{orderUuid}")
public OrderStatus statusCheck(@PathVariable("orderUuid") UUID orderUuid) {
return orderService.statusCheck(orderUuid);
}
@ControllerAdvice
public static class RestResponseEntityExceptionHandler
extends ResponseEntityExceptionHandler {
@ExceptionHandler({OrderFailedException.class})
public ResponseEntity<Object> orderErrorException(
Exception ex, WebRequest request) {
return new ResponseEntity<Object>(
"I_AM_A_TEAPOT", new HttpHeaders(), HttpStatus.I_AM_A_TEAPOT);
}
@ExceptionHandler({OrderNotFoundException.class})
public ResponseEntity<Object> orderNotFoundException(
Exception ex, WebRequest request) {
return new ResponseEntity<Object>(
"NOT_FOUND", new HttpHeaders(), HttpStatus.NOT_FOUND);
}
}
}

View File

@@ -59,7 +59,7 @@ public class OrderService {
}
/**
* check inventory System for Item availability.
* checking inventory System for Item availability.
* this is a third party service simulation and
* let say it tacks around 5 seconds to check your inventory
*/
@@ -76,6 +76,7 @@ public class OrderService {
if (System.currentTimeMillis() % 2 == 0) {
orderIn.setOrderStatus(OrderStatus.INSUFFICIENT_INVENTORY);
orderDataBase.put(orderIn.getOrderUuid(), orderIn);
log.warn("Let's assume we ran out of stock for item: {}", orderIn.getItemName());
throw new OrderFailedException(String.format("insufficient inventory for order: %s", orderIn.getOrderUuid()));
}

View File

@@ -11,7 +11,7 @@ spring:
destination: scs-100.inventoryChecking
group: ${spring.application.name}-inventoryChecking-group
consumer:
maxAttempts: 1
maxAttempts: 1 # this example for simulating out of stock so there is no point of retrying after it failed in the first attempt
order-dlq:
destination: scs-100.ordering_dlq