From 56ffe4b99dd52635bb9b3558f77f762b16dc62bb Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Mon, 28 Oct 2019 21:55:05 -0400 Subject: [PATCH] Update kafka streams product tracker sample --- .../kafka-streams-product-tracker/pom.xml | 99 ++++++++++++++++++- ...KafkaStreamsProductTrackerApplication.java | 33 +++---- .../src/main/resources/application.yml | 5 +- 3 files changed, 111 insertions(+), 26 deletions(-) diff --git a/kafka-streams-samples/kafka-streams-product-tracker/pom.xml b/kafka-streams-samples/kafka-streams-product-tracker/pom.xml index 0e41689..e27b2a5 100644 --- a/kafka-streams-samples/kafka-streams-product-tracker/pom.xml +++ b/kafka-streams-samples/kafka-streams-product-tracker/pom.xml @@ -11,23 +11,62 @@ Demo project for Spring Boot - io.spring.cloud.stream.sample - spring-cloud-stream-samples-parent - 0.0.1-SNAPSHOT - ../.. + org.springframework.boot + spring-boot-starter-parent + 2.2.0.RELEASE + + + Hoxton.BUILD-SNAPSHOT + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + org.springframework.cloud spring-cloud-stream-binder-kafka-streams - org.springframework.boot spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + test + + + org.apache.kafka + kafka-streams-test-utils + ${kafka.version} + test + + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-web + @@ -39,4 +78,54 @@ + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + spring-releases + Spring Releases + https://repo.spring.io/libs-release-local + + false + + + diff --git a/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java b/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java index 7d36391..39113df 100644 --- a/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java +++ b/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java @@ -16,29 +16,28 @@ package kafka.streams.product.tracker; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalTime; +import java.time.ZoneId; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor; +import org.springframework.context.annotation.Bean; import org.springframework.kafka.support.serializer.JsonSerde; -import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.util.StringUtils; -import java.time.Instant; -import java.time.LocalTime; -import java.time.ZoneId; -import java.util.Set; -import java.util.stream.Collectors; - @SpringBootApplication public class KafkaStreamsProductTrackerApplication { @@ -46,21 +45,19 @@ public class KafkaStreamsProductTrackerApplication { SpringApplication.run(KafkaStreamsProductTrackerApplication.class, args); } - @EnableBinding(KafkaStreamsProcessor.class) @EnableConfigurationProperties(ProductTrackerProperties.class) public static class ProductCountApplication { @Autowired ProductTrackerProperties productTrackerProperties; - @StreamListener("input") - @SendTo("output") - public KStream process(KStream input) { - return input + @Bean + public Function, KStream> process() { + return input -> input .filter((key, product) -> productIds().contains(product.getId())) .map((key, value) -> new KeyValue<>(value, value)) - .groupByKey(Serialized.with(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))) - .windowedBy(TimeWindows.of(60_000)) + .groupByKey(Grouped.with(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))) + .windowedBy(TimeWindows.of(Duration.ofSeconds(60))) .count(Materialized.as("product-counts")) .toStream() .map((key, value) -> new KeyValue<>(key.key().id, new ProductStatus(key.key().id, diff --git a/kafka-streams-samples/kafka-streams-product-tracker/src/main/resources/application.yml b/kafka-streams-samples/kafka-streams-product-tracker/src/main/resources/application.yml index f4c881b..53be970 100644 --- a/kafka-streams-samples/kafka-streams-product-tracker/src/main/resources/application.yml +++ b/kafka-streams-samples/kafka-streams-product-tracker/src/main/resources/application.yml @@ -1,8 +1,7 @@ spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000 -spring.cloud.stream.bindings.output: +spring.cloud.stream.bindings.process-out-0: destination: product-counts -spring.cloud.stream.bindings.input: +spring.cloud.stream.bindings.process-in-0: destination: products spring.cloud.stream.kafka.streams.binder: - brokers: localhost spring.application.name: kafka-streams-product-tracker-sample \ No newline at end of file