Fix stream-table sample issues

This commit is contained in:
Soby Chacko
2018-12-03 16:44:42 -05:00
parent 1ce7d3d51b
commit 50b855243a
2 changed files with 7 additions and 5 deletions

View File

@@ -44,7 +44,7 @@ public class KafkaStreamsTableJoin {
@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
@Input("inputX") KTable<String, String> userRegionsTable) {
@Input("inputTable") KTable<String, String> userRegionsTable) {
return userClicksStream
.leftJoin(userRegionsTable,
@@ -60,8 +60,8 @@ public class KafkaStreamsTableJoin {
interface KStreamProcessorX extends KafkaStreamsProcessor {
@Input("inputX")
KTable<?, ?> inputX();
@Input("inputTable")
KTable<?, ?> inputKTable();
}
private static final class RegionWithClicks {

View File

@@ -1,10 +1,12 @@
spring.application.name: stream-table-sample
spring.cloud.stream.bindings.input:
destination: user-clicks3
consumer:
useNativeDecoding: true
headerMode: raw
spring.cloud.stream.bindings.inputX:
spring.cloud.stream.bindings.inputTable:
destination: user-regions
contentType: application/avro
consumer:
useNativeDecoding: true
headerMode: raw
@@ -17,7 +19,7 @@ spring.cloud.stream.kafka.streams.bindings.input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
spring.cloud.stream.kafka.streams.bindings.inputX:
spring.cloud.stream.kafka.streams.bindings.inputTable:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde