Update interactive query sample

This commit is contained in:
Soby Chacko
2019-10-28 21:47:05 -04:00
parent ca18443d8d
commit 2ca8568b55
3 changed files with 107 additions and 22 deletions

View File

@@ -11,12 +11,28 @@
<description>Spring Cloud Stream sample for KStream interactive queries</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<spring-cloud.version>Hoxton.BUILD-SNAPSHOT</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
@@ -27,6 +43,30 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
@@ -38,4 +78,55 @@
</plugins>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>

View File

@@ -16,12 +16,16 @@
package kafka.streams.product.tracker;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@@ -30,19 +34,13 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.StringUtils;
import java.util.Set;
import java.util.stream.Collectors;
@SpringBootApplication
public class KafkaStreamsInteractiveQueryApplication {
@@ -50,7 +48,6 @@ public class KafkaStreamsInteractiveQueryApplication {
SpringApplication.run(KafkaStreamsInteractiveQueryApplication.class, args);
}
@EnableBinding(KafkaStreamsProcessor.class)
@EnableConfigurationProperties(ProductTrackerProperties.class)
@EnableScheduling
public static class InteractiveProductCountApplication {
@@ -60,20 +57,18 @@ public class KafkaStreamsInteractiveQueryApplication {
@Autowired
private InteractiveQueryService queryService;
@Autowired
ProductTrackerProperties productTrackerProperties;
ReadOnlyKeyValueStore<Object, Object> keyValueStore;
@StreamListener("input")
@SendTo("output")
public KStream<Integer, Long> process(KStream<Object, Product> input) {
@Bean
public Function<KStream<Object, Product>, KStream<Integer, Long>> process() {
return input
return input -> input
.filter((key, product) -> productIds().contains(product.getId()))
.map((key, value) -> new KeyValue<>(value.id, value))
.groupByKey(Serialized.with(Serdes.Integer(), new JsonSerde<>(Product.class)))
.groupByKey(Grouped.with(Serdes.Integer(), new JsonSerde<>(Product.class)))
.count(Materialized.<Integer, Long, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
.withKeySerde(Serdes.Integer())
.withValueSerde(Serdes.Long()))

View File

@@ -1,8 +1,7 @@
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.bindings.output:
spring.cloud.stream.bindings.process-out-0:
destination: product-counts
spring.cloud.stream.bindings.input:
spring.cloud.stream.bindings.process-in-0:
destination: products
spring.cloud.stream.kafka.streams.binder:
brokers: localhost
spring.application.name: kafka-streams-iq-basic-sample