From ca18443d8d3b72435077e19c39c5528e682f0b2e Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Mon, 28 Oct 2019 21:33:24 -0400 Subject: [PATCH] Update globalktable binding sample --- .../README.adoc | 6 +- .../docker-compose.yml | 2 +- .../kafka-streams-global-table-join/pom.xml | 100 +++++++++++++++++- .../join/KafkaStreamsGlobalKTableJoin.java | 28 ++--- .../streams/globalktable/join/Producers.java | 2 +- .../src/main/resources/application.yml | 11 +- 6 files changed, 112 insertions(+), 37 deletions(-) diff --git a/kafka-streams-samples/kafka-streams-global-table-join/README.adoc b/kafka-streams-samples/kafka-streams-global-table-join/README.adoc index ee39bb9..a0ee34c 100644 --- a/kafka-streams-samples/kafka-streams-global-table-join/README.adoc +++ b/kafka-streams-samples/kafka-streams-global-table-join/README.adoc @@ -15,11 +15,9 @@ Go to the root of the repository. `java -jar target/kafka-streams-global-table-join-0.0.1-SNAPSHOT.jar` -`docker exec -it kafka-join /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic user-regions` +`docker exec -it kafka-join-1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic user-regions --key-deserializer org.apache.kafka.common.serialization.StringDeserializer --value-deserializer org.apache.kafka.common.serialization.StringDeserializer --property print.key=true --property key.separator="-"` -`docker exec -it kafka-join /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic user-regions --key-deserializer org.apache.kafka.common.serialization.StringDeserializer --value-deserializer org.apache.kafka.common.serialization.StringDeserializer --property print.key=true --property key.separator="-"` - -`docker exec -it kafka-join /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic output-topic --key-deserializer org.apache.kafka.common.serialization.StringDeserializer --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true --property key.separator="-"` +`docker exec -it kafka-join-1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic output-topic --key-deserializer org.apache.kafka.common.serialization.StringDeserializer --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true --property key.separator="-"` Run the stand-alone `Producers` application to generate some data and watch the output on the console consumer above. \ No newline at end of file diff --git a/kafka-streams-samples/kafka-streams-global-table-join/docker-compose.yml b/kafka-streams-samples/kafka-streams-global-table-join/docker-compose.yml index f241d8f..cf6cec9 100644 --- a/kafka-streams-samples/kafka-streams-global-table-join/docker-compose.yml +++ b/kafka-streams-samples/kafka-streams-global-table-join/docker-compose.yml @@ -2,7 +2,7 @@ version: '3' services: kafka: image: wurstmeister/kafka - container_name: kafka-join + container_name: kafka-join-1 ports: - "9092:9092" environment: diff --git a/kafka-streams-samples/kafka-streams-global-table-join/pom.xml b/kafka-streams-samples/kafka-streams-global-table-join/pom.xml index 322e7d6..801dd88 100644 --- a/kafka-streams-samples/kafka-streams-global-table-join/pom.xml +++ b/kafka-streams-samples/kafka-streams-global-table-join/pom.xml @@ -11,23 +11,62 @@ Demo project for Spring Boot - io.spring.cloud.stream.sample - spring-cloud-stream-samples-parent - 0.0.1-SNAPSHOT - ../.. + org.springframework.boot + spring-boot-starter-parent + 2.2.0.RELEASE + + + Hoxton.BUILD-SNAPSHOT + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + org.springframework.cloud spring-cloud-stream-binder-kafka-streams - org.springframework.boot spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + test + + + org.apache.kafka + kafka-streams-test-utils + ${kafka.version} + test + + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-web + @@ -39,4 +78,55 @@ + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + spring-releases + Spring Releases + https://repo.spring.io/libs-release-local + + false + + + + diff --git a/kafka-streams-samples/kafka-streams-global-table-join/src/main/java/kafka/streams/globalktable/join/KafkaStreamsGlobalKTableJoin.java b/kafka-streams-samples/kafka-streams-global-table-join/src/main/java/kafka/streams/globalktable/join/KafkaStreamsGlobalKTableJoin.java index e5205fb..54dc327 100644 --- a/kafka-streams-samples/kafka-streams-global-table-join/src/main/java/kafka/streams/globalktable/join/KafkaStreamsGlobalKTableJoin.java +++ b/kafka-streams-samples/kafka-streams-global-table-join/src/main/java/kafka/streams/globalktable/join/KafkaStreamsGlobalKTableJoin.java @@ -16,18 +16,16 @@ package kafka.streams.globalktable.join; +import java.util.function.BiFunction; + import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.GlobalKTable; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Serialized; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor; -import org.springframework.messaging.handler.annotation.SendTo; +import org.springframework.context.annotation.Bean; /** * This is the PR that added this sample: @@ -40,34 +38,24 @@ public class KafkaStreamsGlobalKTableJoin { SpringApplication.run(KafkaStreamsGlobalKTableJoin.class, args); } - @EnableBinding(KStreamProcessorX.class) public static class KStreamToTableJoinApplication { - @StreamListener - @SendTo("output") - public KStream process(@Input("input") KStream userClicksStream, - @Input("inputTable") GlobalKTable userRegionsTable) { + @Bean + public BiFunction, GlobalKTable, KStream> process() { - return userClicksStream + return (userClicksStream, userRegionsTable) -> userClicksStream .leftJoin(userRegionsTable, (name,value) -> name, (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks) ) .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks())) - .groupByKey(Serialized.with(Serdes.String(), Serdes.Long())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.Long())) .reduce((firstClicks, secondClicks) -> firstClicks + secondClicks) .toStream(); } } - - interface KStreamProcessorX extends KafkaStreamsProcessor { - - @Input("inputTable") - GlobalKTable inputKTable(); - } - private static final class RegionWithClicks { private final String region; diff --git a/kafka-streams-samples/kafka-streams-global-table-join/src/main/java/kafka/streams/globalktable/join/Producers.java b/kafka-streams-samples/kafka-streams-global-table-join/src/main/java/kafka/streams/globalktable/join/Producers.java index 3acca22..bf02404 100644 --- a/kafka-streams-samples/kafka-streams-global-table-join/src/main/java/kafka/streams/globalktable/join/Producers.java +++ b/kafka-streams-samples/kafka-streams-global-table-join/src/main/java/kafka/streams/globalktable/join/Producers.java @@ -57,7 +57,7 @@ public class Producers { DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(props); KafkaTemplate template = new KafkaTemplate<>(pf, true); - template.setDefaultTopic("user-clicks3"); + template.setDefaultTopic("user-clicks"); for (KeyValue keyValue : userClicks) { template.sendDefault(keyValue.key, keyValue.value); diff --git a/kafka-streams-samples/kafka-streams-global-table-join/src/main/resources/application.yml b/kafka-streams-samples/kafka-streams-global-table-join/src/main/resources/application.yml index 49555dd..eed45e2 100644 --- a/kafka-streams-samples/kafka-streams-global-table-join/src/main/resources/application.yml +++ b/kafka-streams-samples/kafka-streams-global-table-join/src/main/resources/application.yml @@ -1,15 +1,14 @@ spring.application.name: stream-global-table-sample -spring.cloud.stream.bindings.input: - destination: user-clicks3 -spring.cloud.stream.bindings.inputTable: +spring.cloud.stream.bindings.process-in-0: + destination: user-clicks +spring.cloud.stream.bindings.process-in-1: destination: user-regions -spring.cloud.stream.bindings.output: +spring.cloud.stream.bindings.process-out-0: destination: output-topic -spring.cloud.stream.kafka.streams.bindings.inputTable: +spring.cloud.stream.kafka.streams.bindings.process-in-1: consumer: materializedAs: all-regions spring.cloud.stream.kafka.streams.binder: - brokers: localhost configuration: default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde