diff --git a/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/java/kafka/streams/product/tracker/KafkaStreamsInteractiveQueryApplication.java b/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/java/kafka/streams/product/tracker/KafkaStreamsInteractiveQueryApplication.java index 95c8187..ce66655 100644 --- a/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/java/kafka/streams/product/tracker/KafkaStreamsInteractiveQueryApplication.java +++ b/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/java/kafka/streams/product/tracker/KafkaStreamsInteractiveQueryApplication.java @@ -17,8 +17,12 @@ package kafka.streams.product.tracker; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.beans.factory.annotation.Autowired; @@ -28,7 +32,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry; +import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService; import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor; import org.springframework.kafka.support.serializer.JsonSerde; import org.springframework.messaging.handler.annotation.SendTo; @@ -54,7 +58,8 @@ public class KafkaStreamsInteractiveQueryApplication { private static final String STORE_NAME = "prod-id-count-store"; @Autowired - private QueryableStoreRegistry queryableStoreRegistry; + private InteractiveQueryService queryService; + @Autowired ProductTrackerProperties productTrackerProperties; @@ -68,8 +73,10 @@ public class KafkaStreamsInteractiveQueryApplication { return input .filter((key, product) -> productIds().contains(product.getId())) .map((key, value) -> new KeyValue<>(value.id, value)) - .groupByKey(new Serdes.IntegerSerde(), new JsonSerde<>(Product.class)) - .count(STORE_NAME) + .groupByKey(Serialized.with(Serdes.Integer(), new JsonSerde<>(Product.class))) + .count(Materialized.>as(STORE_NAME) + .withKeySerde(Serdes.Integer()) + .withValueSerde(Serdes.Long())) .toStream(); } @@ -82,7 +89,7 @@ public class KafkaStreamsInteractiveQueryApplication { @Scheduled(fixedRate = 30000, initialDelay = 5000) public void printProductCounts() { if (keyValueStore == null) { - keyValueStore = queryableStoreRegistry.getQueryableStoreType(STORE_NAME, QueryableStoreTypes.keyValueStore()); + keyValueStore = queryService.getQueryableStore(STORE_NAME, QueryableStoreTypes.keyValueStore()); } for (Integer id : productIds()) {