diff --git a/kafka-streams-samples/kafka-streams-interactive-query-basic/pom.xml b/kafka-streams-samples/kafka-streams-interactive-query-basic/pom.xml index 71b81cb..a709355 100644 --- a/kafka-streams-samples/kafka-streams-interactive-query-basic/pom.xml +++ b/kafka-streams-samples/kafka-streams-interactive-query-basic/pom.xml @@ -11,12 +11,28 @@ Spring Cloud Stream sample for KStream interactive queries - io.spring.cloud.stream.sample - spring-cloud-stream-samples-parent - 0.0.1-SNAPSHOT - ../.. + org.springframework.boot + spring-boot-starter-parent + 2.2.0.RELEASE + + + Hoxton.BUILD-SNAPSHOT + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + org.springframework.cloud @@ -27,6 +43,30 @@ spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + test + + + org.apache.kafka + kafka-streams-test-utils + ${kafka.version} + test + + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-web + @@ -38,4 +78,55 @@ + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + spring-releases + Spring Releases + https://repo.spring.io/libs-release-local + + false + + + + diff --git a/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/java/kafka/streams/product/tracker/KafkaStreamsInteractiveQueryApplication.java b/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/java/kafka/streams/product/tracker/KafkaStreamsInteractiveQueryApplication.java index d84ea89..28460a2 100644 --- a/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/java/kafka/streams/product/tracker/KafkaStreamsInteractiveQueryApplication.java +++ b/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/java/kafka/streams/product/tracker/KafkaStreamsInteractiveQueryApplication.java @@ -16,12 +16,16 @@ package kafka.streams.product.tracker; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -30,19 +34,13 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService; -import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor; +import org.springframework.context.annotation.Bean; import org.springframework.kafka.support.serializer.JsonSerde; -import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.util.StringUtils; -import java.util.Set; -import java.util.stream.Collectors; - @SpringBootApplication public class KafkaStreamsInteractiveQueryApplication { @@ -50,7 +48,6 @@ public class KafkaStreamsInteractiveQueryApplication { SpringApplication.run(KafkaStreamsInteractiveQueryApplication.class, args); } - @EnableBinding(KafkaStreamsProcessor.class) @EnableConfigurationProperties(ProductTrackerProperties.class) @EnableScheduling public static class InteractiveProductCountApplication { @@ -60,20 +57,18 @@ public class KafkaStreamsInteractiveQueryApplication { @Autowired private InteractiveQueryService queryService; - @Autowired ProductTrackerProperties productTrackerProperties; ReadOnlyKeyValueStore keyValueStore; - @StreamListener("input") - @SendTo("output") - public KStream process(KStream input) { + @Bean + public Function, KStream> process() { - return input + return input -> input .filter((key, product) -> productIds().contains(product.getId())) .map((key, value) -> new KeyValue<>(value.id, value)) - .groupByKey(Serialized.with(Serdes.Integer(), new JsonSerde<>(Product.class))) + .groupByKey(Grouped.with(Serdes.Integer(), new JsonSerde<>(Product.class))) .count(Materialized.>as(STORE_NAME) .withKeySerde(Serdes.Integer()) .withValueSerde(Serdes.Long())) diff --git a/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/resources/application.yml b/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/resources/application.yml index 8802443..2b5561e 100644 --- a/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/resources/application.yml +++ b/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/resources/application.yml @@ -1,8 +1,7 @@ spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000 -spring.cloud.stream.bindings.output: +spring.cloud.stream.bindings.process-out-0: destination: product-counts -spring.cloud.stream.bindings.input: +spring.cloud.stream.bindings.process-in-0: destination: products spring.cloud.stream.kafka.streams.binder: - brokers: localhost spring.application.name: kafka-streams-iq-basic-sample