diff --git a/kafka-streams-samples/kafka-streams-product-tracker/pom.xml b/kafka-streams-samples/kafka-streams-product-tracker/pom.xml
index 0e41689..e27b2a5 100644
--- a/kafka-streams-samples/kafka-streams-product-tracker/pom.xml
+++ b/kafka-streams-samples/kafka-streams-product-tracker/pom.xml
@@ -11,23 +11,62 @@
Demo project for Spring Boot
- io.spring.cloud.stream.sample
- spring-cloud-stream-samples-parent
- 0.0.1-SNAPSHOT
- ../..
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.2.0.RELEASE
+
+
+ Hoxton.BUILD-SNAPSHOT
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+
+
org.springframework.cloud
spring-cloud-stream-binder-kafka-streams
-
org.springframework.boot
spring-boot-starter-test
test
+
+ org.springframework.kafka
+ spring-kafka-test
+ test
+
+
+ org.apache.kafka
+ kafka-streams-test-utils
+ ${kafka.version}
+ test
+
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
@@ -39,4 +78,54 @@
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/libs-snapshot-local
+
+ true
+
+
+ false
+
+
+
+ spring-milestones
+ Spring Milestones
+ https://repo.spring.io/libs-milestone-local
+
+ false
+
+
+
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/libs-snapshot-local
+
+ true
+
+
+ false
+
+
+
+ spring-milestones
+ Spring Milestones
+ https://repo.spring.io/libs-milestone-local
+
+ false
+
+
+
+ spring-releases
+ Spring Releases
+ https://repo.spring.io/libs-release-local
+
+ false
+
+
+
diff --git a/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java b/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java
index 7d36391..39113df 100644
--- a/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java
+++ b/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java
@@ -16,29 +16,28 @@
package kafka.streams.product.tracker;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.cloud.stream.annotation.EnableBinding;
-import org.springframework.cloud.stream.annotation.StreamListener;
-import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
+import org.springframework.context.annotation.Bean;
import org.springframework.kafka.support.serializer.JsonSerde;
-import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.util.StringUtils;
-import java.time.Instant;
-import java.time.LocalTime;
-import java.time.ZoneId;
-import java.util.Set;
-import java.util.stream.Collectors;
-
@SpringBootApplication
public class KafkaStreamsProductTrackerApplication {
@@ -46,21 +45,19 @@ public class KafkaStreamsProductTrackerApplication {
SpringApplication.run(KafkaStreamsProductTrackerApplication.class, args);
}
- @EnableBinding(KafkaStreamsProcessor.class)
@EnableConfigurationProperties(ProductTrackerProperties.class)
public static class ProductCountApplication {
@Autowired
ProductTrackerProperties productTrackerProperties;
- @StreamListener("input")
- @SendTo("output")
- public KStream process(KStream