From 68b0614638d1f0883fb3ec4db421826576d66892 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 18 Jun 2019 14:06:15 -0400 Subject: [PATCH] Update kafka streams samples Address Serde changes from upstream Polishing --- .../stream/ShippingKStreamConfiguration.kt | 2 ++ .../src/main/resources/application.yml | 11 ++++++---- .../join/KafkaStreamsAggregateSample.java | 8 +++---- .../KafkaStreamsBranchingSample.java | 5 +---- .../src/main/resources/application.yml | 4 ---- .../src/main/resources/application.yml | 11 +++++----- .../main/resources/by-framework-decoding.yml | 6 ++---- .../src/main/resources/application.yml | 19 +---------------- .../README.adoc | 4 ++-- .../src/main/resources/application.yml | 11 +++------- .../src/main/resources/application.yml | 13 +----------- .../kafka-streams-message-channel/README.adoc | 2 +- .../KafkaStreamsWordCountApplication.java | 6 +----- .../src/main/resources/application.yml | 5 ++--- .../kafka-streams-product-tracker/README.adoc | 11 ++-------- ...KafkaStreamsProductTrackerApplication.java | 5 +---- .../src/main/resources/application.yml | 11 ++-------- .../src/main/resources/application.yml | 21 +------------------ .../src/main/resources/application.yml | 1 - .../src/main/resources/application.yml | 1 - 20 files changed, 38 insertions(+), 119 deletions(-) diff --git a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt b/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt index c61bb4b..10d8e87 100644 --- a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt +++ b/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt @@ -56,6 +56,8 @@ class ShippingKStreamConfiguration { val orderShippedSerde = SpecificAvroSerde() orderShippedSerde.configure(serdeConfig, false) + + val stateStore: Materialized> = Materialized.`as`>("customer-store") .withKeySerde(intSerde) diff --git a/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml b/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml index 32eb452..c75c840 100644 --- a/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml +++ b/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml @@ -1,3 +1,6 @@ +spring.cloud.stream.bindings.input.consumer.useNativeDecoding: false +spring.cloud.stream.bindings.order.consumer.useNativeDecoding: false +spring.cloud.stream.bindings.output.producer.useNativeEncoding: false spring: application: name: shipping-service @@ -14,10 +17,10 @@ spring: default: key: serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde - bindings: - input: - consumer: - valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde +# bindings: +# input: +# consumer: +# valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde bindings: input: destination: customer diff --git a/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/KafkaStreamsAggregateSample.java b/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/KafkaStreamsAggregateSample.java index 01e5c71..1140cc3 100644 --- a/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/KafkaStreamsAggregateSample.java +++ b/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/KafkaStreamsAggregateSample.java @@ -32,7 +32,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry; +import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService; import org.springframework.kafka.support.serializer.JsonSerde; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -41,7 +41,7 @@ import org.springframework.web.bind.annotation.RestController; public class KafkaStreamsAggregateSample { @Autowired - private QueryableStoreRegistry queryableStoreRegistry; + private InteractiveQueryService interactiveQueryService; public static void main(String[] args) { SpringApplication.run(KafkaStreamsAggregateSample.class, args); @@ -51,7 +51,7 @@ public class KafkaStreamsAggregateSample { public static class KafkaStreamsAggregateSampleApplication { @StreamListener("input") - public void process(KStream input) { + public void process(KStream input) { ObjectMapper mapper = new ObjectMapper(); Serde domainEventSerde = new JsonSerde<>( DomainEvent.class, mapper ); @@ -75,7 +75,7 @@ public class KafkaStreamsAggregateSample { public String events() { final ReadOnlyKeyValueStore topFiveStore = - queryableStoreRegistry.getQueryableStoreType("test-events-snapshots", QueryableStoreTypes.keyValueStore()); + interactiveQueryService.getQueryableStore("test-events-snapshots", QueryableStoreTypes.keyValueStore()); return topFiveStore.get("12345"); } } diff --git a/kafka-streams-samples/kafka-streams-branching/src/main/java/kafka/streams/branching/KafkaStreamsBranchingSample.java b/kafka-streams-samples/kafka-streams-branching/src/main/java/kafka/streams/branching/KafkaStreamsBranchingSample.java index b839ec2..816a426 100644 --- a/kafka-streams-samples/kafka-streams-branching/src/main/java/kafka/streams/branching/KafkaStreamsBranchingSample.java +++ b/kafka-streams-samples/kafka-streams-branching/src/main/java/kafka/streams/branching/KafkaStreamsBranchingSample.java @@ -43,9 +43,6 @@ public class KafkaStreamsBranchingSample { @EnableBinding(KStreamProcessorX.class) public static class WordCountProcessorApplication { - @Autowired - private TimeWindows timeWindows; - @StreamListener("input") @SendTo({"output1","output2","output3"}) @SuppressWarnings("unchecked") @@ -58,7 +55,7 @@ public class KafkaStreamsBranchingSample { return input .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .groupBy((key, value) -> value) - .windowedBy(timeWindows) + .windowedBy(TimeWindows.of(60_000)) .count(Materialized.as("WordCounts-1")) .toStream() .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end())))) diff --git a/kafka-streams-samples/kafka-streams-branching/src/main/resources/application.yml b/kafka-streams-samples/kafka-streams-branching/src/main/resources/application.yml index c461557..f396270 100644 --- a/kafka-streams-samples/kafka-streams-branching/src/main/resources/application.yml +++ b/kafka-streams-samples/kafka-streams-branching/src/main/resources/application.yml @@ -1,6 +1,3 @@ -spring.cloud.stream.bindings.output1.contentType: application/json -spring.cloud.stream.bindings.output2.contentType: application/json -spring.cloud.stream.bindings.output3.contentType: application/json spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000 spring.cloud.stream.kafka.streams.binder.configuration: default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde @@ -13,7 +10,6 @@ spring.cloud.stream.bindings.output3: destination: spanish-counts spring.cloud.stream.bindings.input: destination: words - group: group1 spring.cloud.stream.kafka.streams.binder: brokers: localhost #192.168.99.100 #localhost spring.application.name: kafka-streams-branching-sample \ No newline at end of file diff --git a/kafka-streams-samples/kafka-streams-dlq-sample/src/main/resources/application.yml b/kafka-streams-samples/kafka-streams-dlq-sample/src/main/resources/application.yml index 143ce1e..3d241aa 100644 --- a/kafka-streams-samples/kafka-streams-dlq-sample/src/main/resources/application.yml +++ b/kafka-streams-samples/kafka-streams-dlq-sample/src/main/resources/application.yml @@ -1,20 +1,19 @@ -spring.cloud.stream.bindings.output.contentType: application/json spring.cloud.stream.kafka.streams.binder: configuration: commit.interval.ms: 1000 default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde - application.id: default - brokers: localhost #192.168.99.100 + application.id: dlq-1 + brokers: localhost serdeError: sendToDlq spring.cloud.stream.bindings.output: destination: counts spring.cloud.stream.bindings.input: destination: words group: group1 - consumer: - useNativeDecoding: true -spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: words-count-dlq +spring.cloud.stream.kafka.streams.bindings.input.consumer: + dlqName: words-count-dlq + valueSerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde diff --git a/kafka-streams-samples/kafka-streams-dlq-sample/src/main/resources/by-framework-decoding.yml b/kafka-streams-samples/kafka-streams-dlq-sample/src/main/resources/by-framework-decoding.yml index 217c222..1a1e780 100644 --- a/kafka-streams-samples/kafka-streams-dlq-sample/src/main/resources/by-framework-decoding.yml +++ b/kafka-streams-samples/kafka-streams-dlq-sample/src/main/resources/by-framework-decoding.yml @@ -6,18 +6,16 @@ spring.cloud.stream.kafka.streams.binder: commit.interval.ms: 1000 default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde - application.id: default + application.id: dlq-2 spring.cloud.stream.bindings.output: destination: counts producer: - headerMode: raw - #useNativeEncoding: true + useNativeEncoding: false spring.cloud.stream.bindings.input: contentType: foo/bar destination: words group: group1 consumer: - headerMode: raw useNativeDecoding: false spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: words-count-dlq diff --git a/kafka-streams-samples/kafka-streams-global-table-join/src/main/resources/application.yml b/kafka-streams-samples/kafka-streams-global-table-join/src/main/resources/application.yml index ddd621b..49555dd 100644 --- a/kafka-streams-samples/kafka-streams-global-table-join/src/main/resources/application.yml +++ b/kafka-streams-samples/kafka-streams-global-table-join/src/main/resources/application.yml @@ -1,32 +1,15 @@ spring.application.name: stream-global-table-sample spring.cloud.stream.bindings.input: destination: user-clicks3 - consumer: - useNativeDecoding: true spring.cloud.stream.bindings.inputTable: destination: user-regions - contentType: application/avro - consumer: - useNativeDecoding: true spring.cloud.stream.bindings.output: destination: output-topic - producer: - useNativeEncoding: true -spring.cloud.stream.kafka.streams.bindings.input: - consumer: - keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde - valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde spring.cloud.stream.kafka.streams.bindings.inputTable: consumer: - keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde - valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde materializedAs: all-regions -spring.cloud.stream.kafka.streams.bindings.output: - producer: - keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde - valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde spring.cloud.stream.kafka.streams.binder: - brokers: localhost #192.168.99.100 + brokers: localhost configuration: default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde diff --git a/kafka-streams-samples/kafka-streams-interactive-query-advanced/README.adoc b/kafka-streams-samples/kafka-streams-interactive-query-advanced/README.adoc index 7f0736f..e44e1b6 100644 --- a/kafka-streams-samples/kafka-streams-interactive-query-advanced/README.adoc +++ b/kafka-streams-samples/kafka-streams-interactive-query-advanced/README.adoc @@ -29,12 +29,12 @@ For more information on how this is done, please take a look at the application 5. Run the stand-alone `Producers` application to generate data and start the processing. Keep it running for a while. -6. Go to the URL: http://localhost:8080/charts/top-five?genre=Punk +6. Go to the URL: https://localhost:8080/charts/top-five?genre=Punk keep refreshing the URL and you will see the song play count information changes. Take a look at the console sessions for the applications and you will see that it may not be the processor started on 8080 that serves this request. -7. Go to the URL: http://localhost:8082/charts/top-five?genre=Punk +7. Go to the URL: https://localhost:8082/charts/top-five?genre=Punk Take a look at the console sessions for the applications and you will see that it may not be the processor started on 8082 that serves this request. diff --git a/kafka-streams-samples/kafka-streams-interactive-query-advanced/src/main/resources/application.yml b/kafka-streams-samples/kafka-streams-interactive-query-advanced/src/main/resources/application.yml index 9e8c880..772809b 100644 --- a/kafka-streams-samples/kafka-streams-interactive-query-advanced/src/main/resources/application.yml +++ b/kafka-streams-samples/kafka-streams-interactive-query-advanced/src/main/resources/application.yml @@ -1,26 +1,21 @@ spring.cloud.stream.bindings.input: destination: play-events - consumer: - useNativeDecoding: true spring.cloud.stream.bindings.inputX: destination: song-feed - consumer: - useNativeDecoding: true spring.cloud.stream.kafka.streams.bindings.input: consumer: - keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde spring.cloud.stream.kafka.streams.bindings.inputX: consumer: - keySerde: org.apache.kafka.common.serialization.Serdes$LongSerde valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde materializedAs: all-songs spring.cloud.stream.kafka.streams.binder: - brokers: localhost #192.168.99.100 + brokers: localhost configuration: schema.registry.url: http://localhost:8081 commit.interval.ms: 1000 spring.cloud.stream.kafka.streams.binder.autoAddPartitions: true spring.cloud.stream.kafka.streams.binder.minPartitionCount: 4 spring.cloud.stream.kafka.streams.binder.configuration.application.server: localhost:8080 -spring.applicaiton.name: kafka-streams-iq-advanced-sample \ No newline at end of file + +spring.application.name: kafka-streams-iq-advanced-sample \ No newline at end of file diff --git a/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/resources/application.yml b/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/resources/application.yml index ab03fc5..8802443 100644 --- a/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/resources/application.yml +++ b/kafka-streams-samples/kafka-streams-interactive-query-basic/src/main/resources/application.yml @@ -1,19 +1,8 @@ -spring.cloud.stream.bindings.output.contentType: application/json spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000 -spring.cloud.stream.kafka.streams: - binder.configuration: - key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde - value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde - bindings.output.producer: - keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde - valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde spring.cloud.stream.bindings.output: destination: product-counts - producer: - useNativeEncoding: true spring.cloud.stream.bindings.input: destination: products - consumer: spring.cloud.stream.kafka.streams.binder: - brokers: localhost #192.168.99.100 + brokers: localhost spring.application.name: kafka-streams-iq-basic-sample diff --git a/kafka-streams-samples/kafka-streams-message-channel/README.adoc b/kafka-streams-samples/kafka-streams-message-channel/README.adoc index 704bb2e..7370f5e 100644 --- a/kafka-streams-samples/kafka-streams-message-channel/README.adoc +++ b/kafka-streams-samples/kafka-streams-message-channel/README.adoc @@ -13,7 +13,7 @@ Go to the root of the repository and do: `./mvnw clean package` -`java -jar target/kafka-streams-message-channel-0.0.1-SNAPSHOT.jar --spring.cloud.stream.kafka.streams.timeWindow.length=60000` +`java -jar target/kafka-streams-message-channel-0.0.1-SNAPSHOT.jar` Issue the following commands: diff --git a/kafka-streams-samples/kafka-streams-message-channel/src/main/java/kafka/streams/message/channel/KafkaStreamsWordCountApplication.java b/kafka-streams-samples/kafka-streams-message-channel/src/main/java/kafka/streams/message/channel/KafkaStreamsWordCountApplication.java index d0663bb..53c4b8f 100644 --- a/kafka-streams-samples/kafka-streams-message-channel/src/main/java/kafka/streams/message/channel/KafkaStreamsWordCountApplication.java +++ b/kafka-streams-samples/kafka-streams-message-channel/src/main/java/kafka/streams/message/channel/KafkaStreamsWordCountApplication.java @@ -22,7 +22,6 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; @@ -45,9 +44,6 @@ public class KafkaStreamsWordCountApplication { @EnableBinding(MultipleProcessor.class) public static class WordCountProcessorApplication { - @Autowired - private TimeWindows timeWindows; - @StreamListener("binding2") @SendTo("singleOutput") public KStream process(KStream input) { @@ -56,7 +52,7 @@ public class KafkaStreamsWordCountApplication { .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) - .windowedBy(timeWindows) + .windowedBy(TimeWindows.of(60_000)) .count(Materialized.as("WordCounts-1")) .toStream() .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end())))); diff --git a/kafka-streams-samples/kafka-streams-message-channel/src/main/resources/application.yml b/kafka-streams-samples/kafka-streams-message-channel/src/main/resources/application.yml index 61a2e8a..d893d6a 100644 --- a/kafka-streams-samples/kafka-streams-message-channel/src/main/resources/application.yml +++ b/kafka-streams-samples/kafka-streams-message-channel/src/main/resources/application.yml @@ -1,4 +1,3 @@ -spring.cloud.stream.bindings.singleOutput.contentType: application/json spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000 spring.cloud.stream.kafka.streams.binder.configuration: default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde @@ -10,7 +9,7 @@ spring.cloud.stream.bindings.binding2: spring.cloud.stream.bindings.binding1: destination: words spring.cloud.stream.kafka.streams.binder: - brokers: localhost #192.168.99.100 -spring.applicaiton.name: kafka-streams-message-channel-sample + brokers: localhost +spring.application.name: kafka-streams-message-channel-sample diff --git a/kafka-streams-samples/kafka-streams-product-tracker/README.adoc b/kafka-streams-samples/kafka-streams-product-tracker/README.adoc index 38445b9..f5783c6 100644 --- a/kafka-streams-samples/kafka-streams-product-tracker/README.adoc +++ b/kafka-streams-samples/kafka-streams-product-tracker/README.adoc @@ -15,7 +15,7 @@ Go to the root of the repository and do: `./mvnw clean package` -`java -jar target/kafka-streams-product-tracker-0.0.1-SNAPSHOT.jar --app.product.tracker.productIds=123,124,125 --spring.cloud.stream.kafka.streams.timeWindow.length=60000 --spring.cloud.stream.kafka.streams.timeWindow.advanceBy=30000` +`java -jar target/kafka-streams-product-tracker-0.0.1-SNAPSHOT.jar --app.product.tracker.productIds=123,124,125` The above command will track products with ID's 123,124 and 125 every 30 seconds with the counts from the last minute. In other words, every 30 seconds a new 1 minute window is started. @@ -37,11 +37,4 @@ Enter the following in the console producer (one line at a time) and watch the o {"id":"123"} {"id":"123"} {"id":"123"} -``` - -The default time window is configured for 30 seconds and you can change that using the following property. - -`spring.cloud.stream.kafka.streams.timeWindow.length` (value is expressed in milliseconds) - -In order to switch to a hopping window, you can use the `spring.cloud.stream.kafka.streams.timeWindow.advanceBy` (value in milliseconds). -This will create an overlapped hopping windows depending on the value you provide. +``` \ No newline at end of file diff --git a/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java b/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java index c1b573b..7d36391 100644 --- a/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java +++ b/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java @@ -53,9 +53,6 @@ public class KafkaStreamsProductTrackerApplication { @Autowired ProductTrackerProperties productTrackerProperties; - @Autowired - TimeWindows timeWindows; - @StreamListener("input") @SendTo("output") public KStream process(KStream input) { @@ -63,7 +60,7 @@ public class KafkaStreamsProductTrackerApplication { .filter((key, product) -> productIds().contains(product.getId())) .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(Serialized.with(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))) - .windowedBy(timeWindows) + .windowedBy(TimeWindows.of(60_000)) .count(Materialized.as("product-counts")) .toStream() .map((key, value) -> new KeyValue<>(key.key().id, new ProductStatus(key.key().id, diff --git a/kafka-streams-samples/kafka-streams-product-tracker/src/main/resources/application.yml b/kafka-streams-samples/kafka-streams-product-tracker/src/main/resources/application.yml index dbb896a..f4c881b 100644 --- a/kafka-streams-samples/kafka-streams-product-tracker/src/main/resources/application.yml +++ b/kafka-streams-samples/kafka-streams-product-tracker/src/main/resources/application.yml @@ -1,15 +1,8 @@ -spring.cloud.stream.bindings.output.contentType: application/json spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000 -spring.cloud.stream.kafka.streams: - binder.configuration: - key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde - value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde - bindings.output.producer: - keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde spring.cloud.stream.bindings.output: destination: product-counts spring.cloud.stream.bindings.input: destination: products spring.cloud.stream.kafka.streams.binder: - brokers: localhost #192.168.99.100 -spring.applicaiton.name: kafka-streams-product-tracker-sample \ No newline at end of file + brokers: localhost +spring.application.name: kafka-streams-product-tracker-sample \ No newline at end of file diff --git a/kafka-streams-samples/kafka-streams-table-join/src/main/resources/application.yml b/kafka-streams-samples/kafka-streams-table-join/src/main/resources/application.yml index e1d85b3..7a1670f 100644 --- a/kafka-streams-samples/kafka-streams-table-join/src/main/resources/application.yml +++ b/kafka-streams-samples/kafka-streams-table-join/src/main/resources/application.yml @@ -1,31 +1,12 @@ spring.application.name: stream-table-sample spring.cloud.stream.bindings.input: destination: user-clicks3 - consumer: - useNativeDecoding: true spring.cloud.stream.bindings.inputTable: destination: user-regions - contentType: application/avro - consumer: - useNativeDecoding: true spring.cloud.stream.bindings.output: destination: output-topic - producer: - useNativeEncoding: true -spring.cloud.stream.kafka.streams.bindings.input: - consumer: - keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde - valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde -spring.cloud.stream.kafka.streams.bindings.inputTable: - consumer: - keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde - valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde -spring.cloud.stream.kafka.streams.bindings.output: - producer: - keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde - valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde spring.cloud.stream.kafka.streams.binder: - brokers: localhost #192.168.99.100 + brokers: localhost configuration: default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde diff --git a/kafka-streams-samples/kafka-streams-to-rabbitmq-message-channel/src/main/resources/application.yml b/kafka-streams-samples/kafka-streams-to-rabbitmq-message-channel/src/main/resources/application.yml index cb66c11..f2541d4 100644 --- a/kafka-streams-samples/kafka-streams-to-rabbitmq-message-channel/src/main/resources/application.yml +++ b/kafka-streams-samples/kafka-streams-to-rabbitmq-message-channel/src/main/resources/application.yml @@ -1,4 +1,3 @@ -spring.cloud.stream.bindings.singleOutput.contentType: application/json spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000 spring.cloud.stream.kafka.streams: binder.configuration: diff --git a/kafka-streams-samples/kafka-streams-word-count/src/main/resources/application.yml b/kafka-streams-samples/kafka-streams-word-count/src/main/resources/application.yml index 60a6ca7..0f5f0a0 100644 --- a/kafka-streams-samples/kafka-streams-word-count/src/main/resources/application.yml +++ b/kafka-streams-samples/kafka-streams-word-count/src/main/resources/application.yml @@ -1,4 +1,3 @@ -spring.cloud.stream.bindings.output.contentType: application/json spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000 spring.cloud.stream.kafka.streams: binder.configuration: