Adding Kafka Topology on 100-2

This commit is contained in:
Jay Ehsaniara
2021-06-28 11:59:50 -07:00
parent 05958e5736
commit 42528654d8
4 changed files with 238 additions and 0 deletions

View File

@@ -0,0 +1,26 @@
package com.ehsaniara.scs_kafka_intro.scs1002;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import javax.validation.constraints.NotNull;
import java.util.UUID;
@Slf4j
@RestController
@AllArgsConstructor
public class OrderController {
private final OrderService orderService;
@PostMapping("order")
public Order placeOrder(@RequestBody @NotNull(message = "Invalid Order") Order order) {
return orderService.placeOrder().apply(order);
}
@GetMapping("order/status/{orderUuid}")
public OrderStatus statusCheck(@PathVariable("orderUuid") UUID orderUuid) {
return orderService.statusCheck().apply(orderUuid);
}
}

View File

@@ -0,0 +1,124 @@
package com.ehsaniara.scs_kafka_intro.scs1002;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import static com.ehsaniara.scs_kafka_intro.scs1002.Application.STATE_STORE_NAME;
import static com.ehsaniara.scs_kafka_intro.scs1002.OrderStatus.PENDING;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderService implements OrderTopology {
private final InteractiveQueryService interactiveQueryService;
private final Serde<Order> orderJsonSerde;
@Value("${spring.cloud.stream.bindings.orderAggConsumer-in-0.destination}")
private String orderTopic;
@Value("${spring.cloud.stream.kafka.streams.binder.brokers}")
private String bootstrapServer;
public Function<UUID, OrderStatus> statusCheck() {
return orderUuid -> {
final ReadOnlyKeyValueStore<UUID, String> store =
interactiveQueryService.getQueryableStore(STATE_STORE_NAME, QueryableStoreTypes.keyValueStore());
return OrderStatus.valueOf(Optional.ofNullable(store.get(orderUuid))
.orElseThrow(() -> new OrderNotFoundException("Order not found")));
};
}
public Function<Order, Order> placeOrder() {
return orderIn -> {
//create an order
var order = Order.builder()//
.itemName(orderIn.getItemName())//
.orderUuid(UUID.randomUUID())//
.orderStatus(PENDING)//
.build();
//producer
new KafkaTemplate<>(orderJsonSerdeFactoryFunction
.apply(orderJsonSerde.serializer(), bootstrapServer), true) {{
setDefaultTopic(orderTopic);
sendDefault(order.getOrderUuid(), order);
}};
return order;
};
}
@Bean
public Function<KStream<UUID, Order>, KStream<UUID, Order>> orderAggConsumer() {
return uuidOrderKStream -> {
KTable<UUID, String> uuidStringKTable = kStreamKTableStringFunction.apply(uuidOrderKStream);
//then join the stream with its original stream to keep the flow
return uuidOrderKStream.leftJoin(uuidStringKTable,
(order, status) -> order,
Joined.with(Serdes.UUID(), orderJsonSerde, Serdes.String()));
};
}
@Bean
@SuppressWarnings("unchecked")
public Function<KStream<UUID, Order>, KStream<UUID, Order>[]> orderProcess() {
return input -> input
.map((uuid, order) -> {
try {
//create fake delay
Thread.sleep(5_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new KeyValue<>(uuid, order);
})
.map(KeyValue::new)
.branch(isOrderMadePredicate, isInventoryCheckedPredicate, isShippedPredicate);
}
@Bean
public Function<KStream<UUID, Order>, KStream<UUID, Order>> inventoryCheck() {
return input -> input
.peek((key, value) -> value.setOrderStatus(OrderStatus.INVENTORY_CHECKING))
.peek((uuid, order) -> log.debug("Order {} is getting checked in the inventory", uuid))
.map(KeyValue::new);
}
@Bean
public Function<KStream<UUID, Order>, KStream<UUID, Order>> shipping() {
return input -> input
.peek((key, value) -> value.setOrderStatus(OrderStatus.SHIPPED))
.map(KeyValue::new);
}
@Bean
public Consumer<KStream<UUID, Order>> shippedConsumer() {
return input -> input
.foreach((key, value) -> {
log.debug("THIS IS THE END! key: {} value: {}", key, value);
});
}
}

View File

@@ -0,0 +1,47 @@
package com.ehsaniara.scs_kafka_intro.scs1002;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.UUIDSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerde;
import java.util.Map;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
public interface OrderTopology {
Predicate<UUID, Order> isOrderMadePredicate = (k, v) -> v.getOrderStatus().equals(OrderStatus.PENDING);
Predicate<UUID, Order> isInventoryCheckedPredicate = (k, v) -> v.getOrderStatus().equals(OrderStatus.INVENTORY_CHECKING);
Predicate<UUID, Order> isShippedPredicate = (k, v) -> v.getOrderStatus().equals(OrderStatus.SHIPPED);
Function<KStream<UUID, Order>, KTable<UUID, String>> kStreamKTableStringFunction = input -> input
.groupBy((s, order) -> order.getOrderUuid(),
Grouped.with(null, new JsonSerde<>(Order.class, new ObjectMapper())))
.aggregate(
String::new,
(s, order, oldStatus) -> order.getOrderStatus().toString(),
Materialized.<UUID, String, KeyValueStore<Bytes, byte[]>>as(Application.STATE_STORE_NAME)
.withKeySerde(Serdes.UUID()).
withValueSerde(Serdes.String())
);
//Just the min req vars
BiFunction<Serializer<Order>, String, DefaultKafkaProducerFactory<UUID, Order>> orderJsonSerdeFactoryFunction
= (orderSerde, bootstrapServer) -> new DefaultKafkaProducerFactory<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer,
ProducerConfig.RETRIES_CONFIG, 0,
ProducerConfig.BATCH_SIZE_CONFIG, 16384,
ProducerConfig.LINGER_MS_CONFIG, 1,
ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UUIDSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, orderSerde.getClass()));
}

View File

@@ -0,0 +1,41 @@
spring:
application.name: scs-100-2
cloud.function.definition: orderAggConsumer;orderProcess;inventoryCheck;shipping;shippedConsumer
spring.cloud.stream:
bindings:
orderAggConsumer-in-0.destination: scs-100-2.orderStatus
orderAggConsumer-out-0.destination: scs-100-2.orderProcess
orderProcess-in-0.destination: scs-100-2.orderProcess
orderProcess-out-0.destination: scs-100-2.inventoryCheck
orderProcess-out-1.destination: scs-100-2.shipping
orderProcess-out-2.destination: scs-100-2.shipped
inventoryCheck-in-0.destination: scs-100-2.inventoryCheck
inventoryCheck-out-0.destination: scs-100-2.orderStatus
shipping-in-0.destination: scs-100-2.shipping
shipping-out-0.destination: scs-100-2.orderStatus
shippedConsumer-in-0.destination: scs-100-2.shipped
kafka.streams:
bindings:
orderAggConsumer-in-0.consumer.configuration.application.id: ${spring.application.name}-orderAggConsumer
orderProcess-in-0.consumer.configuration.application.id: ${spring.application.name}-orderProcess
inventoryCheck-in-0.consumer.configuration.application.id: ${spring.application.name}-inventoryCheck
shipping-in-0.consumer.configuration.application.id: ${spring.application.name}-shipping
shippedConsumer-in-0.consumer.configuration.application.id: ${spring.application.name}-shipped
binder:
brokers: localhost:9092 # just to use it in the service app, Its already 'localhost:9092' by default
autoAddPartitions: true
configuration:
application.id: ${spring.application.name}
state.dir: ${spring.application.name}${server.port} # to give a unique dir name in case you run multiple of this app on the same machine
default.key.serde: org.apache.kafka.common.serialization.Serdes$UUIDSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 1000
auto.offset.reset: latest
server.port: 8080
logging.level.com.ehsaniara.scs_kafka_intro: debug