Adding Kafka Topology on 100-2
This commit is contained in:
@@ -21,10 +21,6 @@ import java.util.UUID;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static com.ehsaniara.scs_kafka_intro.scs1002.Application.STATE_STORE_NAME;
|
||||
import static com.ehsaniara.scs_kafka_intro.scs1002.OrderStatus.PENDING;
|
||||
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@@ -43,7 +39,7 @@ public class OrderService implements OrderTopology {
|
||||
public Function<UUID, OrderStatus> statusCheck() {
|
||||
return orderUuid -> {
|
||||
final ReadOnlyKeyValueStore<UUID, String> store =
|
||||
interactiveQueryService.getQueryableStore(STATE_STORE_NAME, QueryableStoreTypes.keyValueStore());
|
||||
interactiveQueryService.getQueryableStore(Application.STATE_STORE_NAME, QueryableStoreTypes.keyValueStore());
|
||||
|
||||
return OrderStatus.valueOf(Optional.ofNullable(store.get(orderUuid))
|
||||
.orElseThrow(() -> new OrderNotFoundException("Order not found")));
|
||||
@@ -56,7 +52,7 @@ public class OrderService implements OrderTopology {
|
||||
var order = Order.builder()//
|
||||
.itemName(orderIn.getItemName())//
|
||||
.orderUuid(UUID.randomUUID())//
|
||||
.orderStatus(PENDING)//
|
||||
.orderStatus(OrderStatus.PENDING)//
|
||||
.build();
|
||||
|
||||
//producer
|
||||
@@ -117,8 +113,6 @@ public class OrderService implements OrderTopology {
|
||||
@Bean
|
||||
public Consumer<KStream<UUID, Order>> shippedConsumer() {
|
||||
return input -> input
|
||||
.foreach((key, value) -> {
|
||||
log.debug("THIS IS THE END! key: {} value: {}", key, value);
|
||||
});
|
||||
.foreach((key, value) -> log.debug("THIS IS THE END! key: {} value: {}", key, value));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user