diff --git a/scs-100-2/src/main/java/com/ehsaniara/scs_kafka_intro/scs1002/OrderController.java b/scs-100-2/src/main/java/com/ehsaniara/scs_kafka_intro/scs1002/OrderController.java new file mode 100644 index 0000000..30752cd --- /dev/null +++ b/scs-100-2/src/main/java/com/ehsaniara/scs_kafka_intro/scs1002/OrderController.java @@ -0,0 +1,26 @@ +package com.ehsaniara.scs_kafka_intro.scs1002; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.*; + +import javax.validation.constraints.NotNull; +import java.util.UUID; + +@Slf4j +@RestController +@AllArgsConstructor +public class OrderController { + + private final OrderService orderService; + + @PostMapping("order") + public Order placeOrder(@RequestBody @NotNull(message = "Invalid Order") Order order) { + return orderService.placeOrder().apply(order); + } + + @GetMapping("order/status/{orderUuid}") + public OrderStatus statusCheck(@PathVariable("orderUuid") UUID orderUuid) { + return orderService.statusCheck().apply(orderUuid); + } +} diff --git a/scs-100-2/src/main/java/com/ehsaniara/scs_kafka_intro/scs1002/OrderService.java b/scs-100-2/src/main/java/com/ehsaniara/scs_kafka_intro/scs1002/OrderService.java new file mode 100644 index 0000000..e7843bc --- /dev/null +++ b/scs-100-2/src/main/java/com/ehsaniara/scs_kafka_intro/scs1002/OrderService.java @@ -0,0 +1,124 @@ +package com.ehsaniara.scs_kafka_intro.scs1002; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +import java.util.Optional; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Function; + +import static com.ehsaniara.scs_kafka_intro.scs1002.Application.STATE_STORE_NAME; +import static com.ehsaniara.scs_kafka_intro.scs1002.OrderStatus.PENDING; + + +@Slf4j +@Service +@RequiredArgsConstructor +public class OrderService implements OrderTopology { + + private final InteractiveQueryService interactiveQueryService; + + private final Serde orderJsonSerde; + + @Value("${spring.cloud.stream.bindings.orderAggConsumer-in-0.destination}") + private String orderTopic; + + @Value("${spring.cloud.stream.kafka.streams.binder.brokers}") + private String bootstrapServer; + + public Function statusCheck() { + return orderUuid -> { + final ReadOnlyKeyValueStore store = + interactiveQueryService.getQueryableStore(STATE_STORE_NAME, QueryableStoreTypes.keyValueStore()); + + return OrderStatus.valueOf(Optional.ofNullable(store.get(orderUuid)) + .orElseThrow(() -> new OrderNotFoundException("Order not found"))); + }; + } + + public Function placeOrder() { + return orderIn -> { + //create an order + var order = Order.builder()// + .itemName(orderIn.getItemName())// + .orderUuid(UUID.randomUUID())// + .orderStatus(PENDING)// + .build(); + + //producer + new KafkaTemplate<>(orderJsonSerdeFactoryFunction + .apply(orderJsonSerde.serializer(), bootstrapServer), true) {{ + setDefaultTopic(orderTopic); + sendDefault(order.getOrderUuid(), order); + }}; + return order; + }; + } + + @Bean + public Function, KStream> orderAggConsumer() { + return uuidOrderKStream -> { + KTable uuidStringKTable = kStreamKTableStringFunction.apply(uuidOrderKStream); + + //then join the stream with its original stream to keep the flow + return uuidOrderKStream.leftJoin(uuidStringKTable, + (order, status) -> order, + Joined.with(Serdes.UUID(), orderJsonSerde, Serdes.String())); + }; + } + + @Bean + @SuppressWarnings("unchecked") + public Function, KStream[]> orderProcess() { + + return input -> input + .map((uuid, order) -> { + try { + //create fake delay + Thread.sleep(5_000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return new KeyValue<>(uuid, order); + }) + .map(KeyValue::new) + .branch(isOrderMadePredicate, isInventoryCheckedPredicate, isShippedPredicate); + } + + @Bean + public Function, KStream> inventoryCheck() { + return input -> input + .peek((key, value) -> value.setOrderStatus(OrderStatus.INVENTORY_CHECKING)) + .peek((uuid, order) -> log.debug("Order {} is getting checked in the inventory", uuid)) + .map(KeyValue::new); + } + + @Bean + public Function, KStream> shipping() { + return input -> input + .peek((key, value) -> value.setOrderStatus(OrderStatus.SHIPPED)) + .map(KeyValue::new); + } + + @Bean + public Consumer> shippedConsumer() { + return input -> input + .foreach((key, value) -> { + log.debug("THIS IS THE END! key: {} value: {}", key, value); + }); + } +} diff --git a/scs-100-2/src/main/java/com/ehsaniara/scs_kafka_intro/scs1002/OrderTopology.java b/scs-100-2/src/main/java/com/ehsaniara/scs_kafka_intro/scs1002/OrderTopology.java new file mode 100644 index 0000000..0eba85f --- /dev/null +++ b/scs-100-2/src/main/java/com/ehsaniara/scs_kafka_intro/scs1002/OrderTopology.java @@ -0,0 +1,47 @@ +package com.ehsaniara.scs_kafka_intro.scs1002; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.UUIDSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.state.KeyValueStore; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.util.Map; +import java.util.UUID; +import java.util.function.BiFunction; +import java.util.function.Function; + +public interface OrderTopology { + + Predicate isOrderMadePredicate = (k, v) -> v.getOrderStatus().equals(OrderStatus.PENDING); + Predicate isInventoryCheckedPredicate = (k, v) -> v.getOrderStatus().equals(OrderStatus.INVENTORY_CHECKING); + Predicate isShippedPredicate = (k, v) -> v.getOrderStatus().equals(OrderStatus.SHIPPED); + + Function, KTable> kStreamKTableStringFunction = input -> input + .groupBy((s, order) -> order.getOrderUuid(), + Grouped.with(null, new JsonSerde<>(Order.class, new ObjectMapper()))) + .aggregate( + String::new, + (s, order, oldStatus) -> order.getOrderStatus().toString(), + Materialized.>as(Application.STATE_STORE_NAME) + .withKeySerde(Serdes.UUID()). + withValueSerde(Serdes.String()) + ); + + //Just the min req vars + BiFunction, String, DefaultKafkaProducerFactory> orderJsonSerdeFactoryFunction + = (orderSerde, bootstrapServer) -> new DefaultKafkaProducerFactory<>(Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer, + ProducerConfig.RETRIES_CONFIG, 0, + ProducerConfig.BATCH_SIZE_CONFIG, 16384, + ProducerConfig.LINGER_MS_CONFIG, 1, + ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UUIDSerializer.class, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, orderSerde.getClass())); + +} diff --git a/scs-100-2/src/main/resources/application.yml b/scs-100-2/src/main/resources/application.yml new file mode 100644 index 0000000..03b7dbe --- /dev/null +++ b/scs-100-2/src/main/resources/application.yml @@ -0,0 +1,41 @@ +spring: + application.name: scs-100-2 + cloud.function.definition: orderAggConsumer;orderProcess;inventoryCheck;shipping;shippedConsumer + +spring.cloud.stream: + bindings: + orderAggConsumer-in-0.destination: scs-100-2.orderStatus + orderAggConsumer-out-0.destination: scs-100-2.orderProcess + + orderProcess-in-0.destination: scs-100-2.orderProcess + orderProcess-out-0.destination: scs-100-2.inventoryCheck + orderProcess-out-1.destination: scs-100-2.shipping + orderProcess-out-2.destination: scs-100-2.shipped + + inventoryCheck-in-0.destination: scs-100-2.inventoryCheck + inventoryCheck-out-0.destination: scs-100-2.orderStatus + + shipping-in-0.destination: scs-100-2.shipping + shipping-out-0.destination: scs-100-2.orderStatus + + shippedConsumer-in-0.destination: scs-100-2.shipped + + kafka.streams: + bindings: + orderAggConsumer-in-0.consumer.configuration.application.id: ${spring.application.name}-orderAggConsumer + orderProcess-in-0.consumer.configuration.application.id: ${spring.application.name}-orderProcess + inventoryCheck-in-0.consumer.configuration.application.id: ${spring.application.name}-inventoryCheck + shipping-in-0.consumer.configuration.application.id: ${spring.application.name}-shipping + shippedConsumer-in-0.consumer.configuration.application.id: ${spring.application.name}-shipped + binder: + brokers: localhost:9092 # just to use it in the service app, Its already 'localhost:9092' by default + autoAddPartitions: true + configuration: + application.id: ${spring.application.name} + state.dir: ${spring.application.name}${server.port} # to give a unique dir name in case you run multiple of this app on the same machine + default.key.serde: org.apache.kafka.common.serialization.Serdes$UUIDSerde + default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde + commit.interval.ms: 1000 + auto.offset.reset: latest +server.port: 8080 +logging.level.com.ehsaniara.scs_kafka_intro: debug