Update globalktable binding sample

This commit is contained in:
Soby Chacko
2019-10-28 21:33:24 -04:00
parent 3b0537c450
commit ca18443d8d
6 changed files with 112 additions and 37 deletions

View File

@@ -15,11 +15,9 @@ Go to the root of the repository.
`java -jar target/kafka-streams-global-table-join-0.0.1-SNAPSHOT.jar`
`docker exec -it kafka-join /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic user-regions`
`docker exec -it kafka-join-1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic user-regions --key-deserializer org.apache.kafka.common.serialization.StringDeserializer --value-deserializer org.apache.kafka.common.serialization.StringDeserializer --property print.key=true --property key.separator="-"`
`docker exec -it kafka-join /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic user-regions --key-deserializer org.apache.kafka.common.serialization.StringDeserializer --value-deserializer org.apache.kafka.common.serialization.StringDeserializer --property print.key=true --property key.separator="-"`
`docker exec -it kafka-join /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic output-topic --key-deserializer org.apache.kafka.common.serialization.StringDeserializer --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true --property key.separator="-"`
`docker exec -it kafka-join-1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic output-topic --key-deserializer org.apache.kafka.common.serialization.StringDeserializer --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true --property key.separator="-"`
Run the stand-alone `Producers` application to generate some data and watch the output on the console consumer above.

View File

@@ -2,7 +2,7 @@ version: '3'
services:
kafka:
image: wurstmeister/kafka
container_name: kafka-join
container_name: kafka-join-1
ports:
- "9092:9092"
environment:

View File

@@ -11,23 +11,62 @@
<description>Demo project for Spring Boot</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<spring-cloud.version>Hoxton.BUILD-SNAPSHOT</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
@@ -39,4 +78,55 @@
</plugins>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>

View File

@@ -16,18 +16,16 @@
package kafka.streams.globalktable.join;
import java.util.function.BiFunction;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Serialized;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.context.annotation.Bean;
/**
* This is the PR that added this sample:
@@ -40,34 +38,24 @@ public class KafkaStreamsGlobalKTableJoin {
SpringApplication.run(KafkaStreamsGlobalKTableJoin.class, args);
}
@EnableBinding(KStreamProcessorX.class)
public static class KStreamToTableJoinApplication {
@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
@Input("inputTable") GlobalKTable<String, String> userRegionsTable) {
@Bean
public BiFunction<KStream<String, Long>, GlobalKTable<String, String>, KStream<String, Long>> process() {
return userClicksStream
return (userClicksStream, userRegionsTable) -> userClicksStream
.leftJoin(userRegionsTable,
(name,value) -> name,
(clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks)
)
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce((firstClicks, secondClicks) -> firstClicks + secondClicks)
.toStream();
}
}
interface KStreamProcessorX extends KafkaStreamsProcessor {
@Input("inputTable")
GlobalKTable<?, ?> inputKTable();
}
private static final class RegionWithClicks {
private final String region;

View File

@@ -57,7 +57,7 @@ public class Producers {
DefaultKafkaProducerFactory<String, Long> pf = new DefaultKafkaProducerFactory<>(props);
KafkaTemplate<String, Long> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("user-clicks3");
template.setDefaultTopic("user-clicks");
for (KeyValue<String,Long> keyValue : userClicks) {
template.sendDefault(keyValue.key, keyValue.value);

View File

@@ -1,15 +1,14 @@
spring.application.name: stream-global-table-sample
spring.cloud.stream.bindings.input:
destination: user-clicks3
spring.cloud.stream.bindings.inputTable:
spring.cloud.stream.bindings.process-in-0:
destination: user-clicks
spring.cloud.stream.bindings.process-in-1:
destination: user-regions
spring.cloud.stream.bindings.output:
spring.cloud.stream.bindings.process-out-0:
destination: output-topic
spring.cloud.stream.kafka.streams.bindings.inputTable:
spring.cloud.stream.kafka.streams.bindings.process-in-1:
consumer:
materializedAs: all-regions
spring.cloud.stream.kafka.streams.binder:
brokers: localhost
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde