diff --git a/scs-100-2/nginx/docker-compose.yml b/scs-100-2/nginx/docker-compose.yml
new file mode 100644
index 0000000..4bb0e3a
--- /dev/null
+++ b/scs-100-2/nginx/docker-compose.yml
@@ -0,0 +1,11 @@
+version: '2'
+
+services:
+ scs-100-2-nginx:
+ image: nginx:latest
+ ports:
+ - "8080:8080"
+ container_name: scs-100-2-nginx
+ volumes:
+ - ./nginx.conf:/etc/nginx/nginx.conf
+
diff --git a/scs-100-2/nginx/nginx.conf b/scs-100-2/nginx/nginx.conf
new file mode 100644
index 0000000..63b9b09
--- /dev/null
+++ b/scs-100-2/nginx/nginx.conf
@@ -0,0 +1,14 @@
+events { }
+stream {
+ upstream loadbalance {
+ server host.docker.internal:8081;
+ server host.docker.internal:8082;
+ }
+
+ server {
+ listen 8080;
+ proxy_pass loadbalance;
+ }
+
+}
+
diff --git a/scs-100-2/pom.xml b/scs-100-2/pom.xml
index 08889ec..eff7d40 100644
--- a/scs-100-2/pom.xml
+++ b/scs-100-2/pom.xml
@@ -39,6 +39,11 @@
spring-cloud-stream-binder-kafka-streams
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
org.projectlombok
lombok
diff --git a/scs-100-2/src/main/java/com/ehsaniara/scs_kafka_intro/scs1002/OrderService.java b/scs-100-2/src/main/java/com/ehsaniara/scs_kafka_intro/scs1002/OrderService.java
index 4a4133b..d6a9f43 100644
--- a/scs-100-2/src/main/java/com/ehsaniara/scs_kafka_intro/scs1002/OrderService.java
+++ b/scs-100-2/src/main/java/com/ehsaniara/scs_kafka_intro/scs1002/OrderService.java
@@ -4,10 +4,12 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.UUIDSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.beans.factory.annotation.Value;
@@ -15,6 +17,7 @@ import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQuerySer
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
import java.util.Optional;
import java.util.UUID;
@@ -40,9 +43,20 @@ public class OrderService implements OrderTopology {
return orderUuid -> {
final ReadOnlyKeyValueStore store =
interactiveQueryService.getQueryableStore(Application.STATE_STORE_NAME, QueryableStoreTypes.keyValueStore());
+ HostInfo hostInfo = interactiveQueryService.getHostInfo(Application.STATE_STORE_NAME,
+ orderUuid, new UUIDSerializer());
- return OrderStatus.valueOf(Optional.ofNullable(store.get(orderUuid))
- .orElseThrow(() -> new OrderNotFoundException("Order not found")));
+ log.debug("key located in: {}", hostInfo);
+ if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
+ //get it from current app store
+ return OrderStatus.valueOf(Optional.ofNullable(store.get(orderUuid))
+ .orElseThrow(() -> new OrderNotFoundException("Order not found")));
+ } else {
+ //get it from remote app store
+ return new RestTemplate().getForEntity(
+ String.format("%s://%s:%d/order/status/%s", "http", hostInfo.host(), hostInfo.port(), orderUuid)
+ , OrderStatus.class).getBody();
+ }
};
}
diff --git a/scs-100-2/src/main/resources/application.yml b/scs-100-2/src/main/resources/application.yml
index bd2db19..1c17419 100644
--- a/scs-100-2/src/main/resources/application.yml
+++ b/scs-100-2/src/main/resources/application.yml
@@ -29,13 +29,17 @@ spring.cloud.stream:
shippedConsumer-in-0.consumer.configuration.application.id: ${spring.application.name}-shipped
binder:
brokers: localhost:9092 # just to use it in the service app, Its already 'localhost:9092' by default
- autoAddPartitions: true
- minPartitionCount: 6
+ auto-add-partitions: true
+ min-partition-count: 6
+ state-store-retry:
+ max-attempts: 10
+ backoff-period: 500
configuration:
application.id: ${spring.application.name}
+ application.server: localhost:${server.port} # for InteractiveQueryService to describe itself
state.dir: state-${spring.application.name}-${server.port} # to give a unique dir name in case you run multiple of this app on the same machine
default.key.serde: org.apache.kafka.common.serialization.Serdes$UUIDSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
- commit.interval.ms: 100
+ commit.interval.ms: 1000
auto.offset.reset: latest
logging.level.com.ehsaniara.scs_kafka_intro: debug