From 50b855243aeec3e951b2a03ffef0e9de4b1220cb Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Mon, 3 Dec 2018 16:44:42 -0500 Subject: [PATCH] Fix stream-table sample issues --- .../kafka/streams/table/join/KafkaStreamsTableJoin.java | 6 +++--- .../src/main/resources/application.yml | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/kafka-streams-samples/kafka-streams-table-join/src/main/java/kafka/streams/table/join/KafkaStreamsTableJoin.java b/kafka-streams-samples/kafka-streams-table-join/src/main/java/kafka/streams/table/join/KafkaStreamsTableJoin.java index a4a1724..b431941 100644 --- a/kafka-streams-samples/kafka-streams-table-join/src/main/java/kafka/streams/table/join/KafkaStreamsTableJoin.java +++ b/kafka-streams-samples/kafka-streams-table-join/src/main/java/kafka/streams/table/join/KafkaStreamsTableJoin.java @@ -44,7 +44,7 @@ public class KafkaStreamsTableJoin { @StreamListener @SendTo("output") public KStream process(@Input("input") KStream userClicksStream, - @Input("inputX") KTable userRegionsTable) { + @Input("inputTable") KTable userRegionsTable) { return userClicksStream .leftJoin(userRegionsTable, @@ -60,8 +60,8 @@ public class KafkaStreamsTableJoin { interface KStreamProcessorX extends KafkaStreamsProcessor { - @Input("inputX") - KTable inputX(); + @Input("inputTable") + KTable inputKTable(); } private static final class RegionWithClicks { diff --git a/kafka-streams-samples/kafka-streams-table-join/src/main/resources/application.yml b/kafka-streams-samples/kafka-streams-table-join/src/main/resources/application.yml index 40358e3..786a0bb 100644 --- a/kafka-streams-samples/kafka-streams-table-join/src/main/resources/application.yml +++ b/kafka-streams-samples/kafka-streams-table-join/src/main/resources/application.yml @@ -1,10 +1,12 @@ +spring.application.name: stream-table-sample spring.cloud.stream.bindings.input: destination: user-clicks3 consumer: useNativeDecoding: true headerMode: raw -spring.cloud.stream.bindings.inputX: +spring.cloud.stream.bindings.inputTable: destination: user-regions + contentType: application/avro consumer: useNativeDecoding: true headerMode: raw @@ -17,7 +19,7 @@ spring.cloud.stream.kafka.streams.bindings.input: consumer: keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde -spring.cloud.stream.kafka.streams.bindings.inputX: +spring.cloud.stream.kafka.streams.bindings.inputTable: consumer: keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde