Update kafka streams samples

Address Serde changes from upstream
Polishing
This commit is contained in:
Soby Chacko
2019-06-18 14:06:15 -04:00
parent aeb05a7515
commit 68b0614638
20 changed files with 38 additions and 119 deletions

View File

@@ -56,6 +56,8 @@ class ShippingKStreamConfiguration {
val orderShippedSerde = SpecificAvroSerde<OrderShippedEvent>()
orderShippedSerde.configure(serdeConfig, false)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)

View File

@@ -1,3 +1,6 @@
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: false
spring.cloud.stream.bindings.order.consumer.useNativeDecoding: false
spring.cloud.stream.bindings.output.producer.useNativeEncoding: false
spring:
application:
name: shipping-service
@@ -14,10 +17,10 @@ spring:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
bindings:
input:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
# bindings:
# input:
# consumer:
# valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
bindings:
input:
destination: customer

View File

@@ -32,7 +32,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@@ -41,7 +41,7 @@ import org.springframework.web.bind.annotation.RestController;
public class KafkaStreamsAggregateSample {
@Autowired
private QueryableStoreRegistry queryableStoreRegistry;
private InteractiveQueryService interactiveQueryService;
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsAggregateSample.class, args);
@@ -51,7 +51,7 @@ public class KafkaStreamsAggregateSample {
public static class KafkaStreamsAggregateSampleApplication {
@StreamListener("input")
public void process(KStream<Object, DomainEvent> input) {
public void process(KStream<String, DomainEvent> input) {
ObjectMapper mapper = new ObjectMapper();
Serde<DomainEvent> domainEventSerde = new JsonSerde<>( DomainEvent.class, mapper );
@@ -75,7 +75,7 @@ public class KafkaStreamsAggregateSample {
public String events() {
final ReadOnlyKeyValueStore<String, String> topFiveStore =
queryableStoreRegistry.getQueryableStoreType("test-events-snapshots", QueryableStoreTypes.<String, String>keyValueStore());
interactiveQueryService.getQueryableStore("test-events-snapshots", QueryableStoreTypes.<String, String>keyValueStore());
return topFiveStore.get("12345");
}
}

View File

@@ -43,9 +43,6 @@ public class KafkaStreamsBranchingSample {
@EnableBinding(KStreamProcessorX.class)
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@StreamListener("input")
@SendTo({"output1","output2","output3"})
@SuppressWarnings("unchecked")
@@ -58,7 +55,7 @@ public class KafkaStreamsBranchingSample {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(timeWindows)
.windowedBy(TimeWindows.of(60_000))
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))

View File

@@ -1,6 +1,3 @@
spring.cloud.stream.bindings.output1.contentType: application/json
spring.cloud.stream.bindings.output2.contentType: application/json
spring.cloud.stream.bindings.output3.contentType: application/json
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams.binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
@@ -13,7 +10,6 @@ spring.cloud.stream.bindings.output3:
destination: spanish-counts
spring.cloud.stream.bindings.input:
destination: words
group: group1
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100 #localhost
spring.application.name: kafka-streams-branching-sample

View File

@@ -1,20 +1,19 @@
spring.cloud.stream.bindings.output.contentType: application/json
spring.cloud.stream.kafka.streams.binder:
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
application.id: default
brokers: localhost #192.168.99.100
application.id: dlq-1
brokers: localhost
serdeError: sendToDlq
spring.cloud.stream.bindings.output:
destination: counts
spring.cloud.stream.bindings.input:
destination: words
group: group1
consumer:
useNativeDecoding: true
spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: words-count-dlq
spring.cloud.stream.kafka.streams.bindings.input.consumer:
dlqName: words-count-dlq
valueSerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde

View File

@@ -6,18 +6,16 @@ spring.cloud.stream.kafka.streams.binder:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
application.id: default
application.id: dlq-2
spring.cloud.stream.bindings.output:
destination: counts
producer:
headerMode: raw
#useNativeEncoding: true
useNativeEncoding: false
spring.cloud.stream.bindings.input:
contentType: foo/bar
destination: words
group: group1
consumer:
headerMode: raw
useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: words-count-dlq

View File

@@ -1,32 +1,15 @@
spring.application.name: stream-global-table-sample
spring.cloud.stream.bindings.input:
destination: user-clicks3
consumer:
useNativeDecoding: true
spring.cloud.stream.bindings.inputTable:
destination: user-regions
contentType: application/avro
consumer:
useNativeDecoding: true
spring.cloud.stream.bindings.output:
destination: output-topic
producer:
useNativeEncoding: true
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
spring.cloud.stream.kafka.streams.bindings.inputTable:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
materializedAs: all-regions
spring.cloud.stream.kafka.streams.bindings.output:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
brokers: localhost
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

View File

@@ -29,12 +29,12 @@ For more information on how this is done, please take a look at the application
5. Run the stand-alone `Producers` application to generate data and start the processing.
Keep it running for a while.
6. Go to the URL: http://localhost:8080/charts/top-five?genre=Punk
6. Go to the URL: https://localhost:8080/charts/top-five?genre=Punk
keep refreshing the URL and you will see the song play count information changes.
Take a look at the console sessions for the applications and you will see that it may not be the processor started on 8080 that serves this request.
7. Go to the URL: http://localhost:8082/charts/top-five?genre=Punk
7. Go to the URL: https://localhost:8082/charts/top-five?genre=Punk
Take a look at the console sessions for the applications and you will see that it may not be the processor started on 8082 that serves this request.

View File

@@ -1,26 +1,21 @@
spring.cloud.stream.bindings.input:
destination: play-events
consumer:
useNativeDecoding: true
spring.cloud.stream.bindings.inputX:
destination: song-feed
consumer:
useNativeDecoding: true
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.inputX:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$LongSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
materializedAs: all-songs
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
brokers: localhost
configuration:
schema.registry.url: http://localhost:8081
commit.interval.ms: 1000
spring.cloud.stream.kafka.streams.binder.autoAddPartitions: true
spring.cloud.stream.kafka.streams.binder.minPartitionCount: 4
spring.cloud.stream.kafka.streams.binder.configuration.application.server: localhost:8080
spring.applicaiton.name: kafka-streams-iq-advanced-sample
spring.application.name: kafka-streams-iq-advanced-sample

View File

@@ -1,19 +1,8 @@
spring.cloud.stream.bindings.output.contentType: application/json
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
binder.configuration:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings.output.producer:
keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
spring.cloud.stream.bindings.output:
destination: product-counts
producer:
useNativeEncoding: true
spring.cloud.stream.bindings.input:
destination: products
consumer:
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
brokers: localhost
spring.application.name: kafka-streams-iq-basic-sample

View File

@@ -13,7 +13,7 @@ Go to the root of the repository and do:
`./mvnw clean package`
`java -jar target/kafka-streams-message-channel-0.0.1-SNAPSHOT.jar --spring.cloud.stream.kafka.streams.timeWindow.length=60000`
`java -jar target/kafka-streams-message-channel-0.0.1-SNAPSHOT.jar`
Issue the following commands:

View File

@@ -22,7 +22,6 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
@@ -45,9 +44,6 @@ public class KafkaStreamsWordCountApplication {
@EnableBinding(MultipleProcessor.class)
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@StreamListener("binding2")
@SendTo("singleOutput")
public KStream<?, WordCount> process(KStream<Object, String> input) {
@@ -56,7 +52,7 @@ public class KafkaStreamsWordCountApplication {
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(timeWindows)
.windowedBy(TimeWindows.of(60_000))
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));

View File

@@ -1,4 +1,3 @@
spring.cloud.stream.bindings.singleOutput.contentType: application/json
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams.binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
@@ -10,7 +9,7 @@ spring.cloud.stream.bindings.binding2:
spring.cloud.stream.bindings.binding1:
destination: words
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
spring.applicaiton.name: kafka-streams-message-channel-sample
brokers: localhost
spring.application.name: kafka-streams-message-channel-sample

View File

@@ -15,7 +15,7 @@ Go to the root of the repository and do:
`./mvnw clean package`
`java -jar target/kafka-streams-product-tracker-0.0.1-SNAPSHOT.jar --app.product.tracker.productIds=123,124,125 --spring.cloud.stream.kafka.streams.timeWindow.length=60000 --spring.cloud.stream.kafka.streams.timeWindow.advanceBy=30000`
`java -jar target/kafka-streams-product-tracker-0.0.1-SNAPSHOT.jar --app.product.tracker.productIds=123,124,125`
The above command will track products with ID's 123,124 and 125 every 30 seconds with the counts from the last minute.
In other words, every 30 seconds a new 1 minute window is started.
@@ -38,10 +38,3 @@ Enter the following in the console producer (one line at a time) and watch the o
{"id":"123"}
{"id":"123"}
```
The default time window is configured for 30 seconds and you can change that using the following property.
`spring.cloud.stream.kafka.streams.timeWindow.length` (value is expressed in milliseconds)
In order to switch to a hopping window, you can use the `spring.cloud.stream.kafka.streams.timeWindow.advanceBy` (value in milliseconds).
This will create an overlapped hopping windows depending on the value you provide.

View File

@@ -53,9 +53,6 @@ public class KafkaStreamsProductTrackerApplication {
@Autowired
ProductTrackerProperties productTrackerProperties;
@Autowired
TimeWindows timeWindows;
@StreamListener("input")
@SendTo("output")
public KStream<Integer, ProductStatus> process(KStream<Object, Product> input) {
@@ -63,7 +60,7 @@ public class KafkaStreamsProductTrackerApplication {
.filter((key, product) -> productIds().contains(product.getId()))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class)))
.windowedBy(timeWindows)
.windowedBy(TimeWindows.of(60_000))
.count(Materialized.as("product-counts"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key().id, new ProductStatus(key.key().id,

View File

@@ -1,15 +1,8 @@
spring.cloud.stream.bindings.output.contentType: application/json
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
binder.configuration:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings.output.producer:
keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
spring.cloud.stream.bindings.output:
destination: product-counts
spring.cloud.stream.bindings.input:
destination: products
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
spring.applicaiton.name: kafka-streams-product-tracker-sample
brokers: localhost
spring.application.name: kafka-streams-product-tracker-sample

View File

@@ -1,31 +1,12 @@
spring.application.name: stream-table-sample
spring.cloud.stream.bindings.input:
destination: user-clicks3
consumer:
useNativeDecoding: true
spring.cloud.stream.bindings.inputTable:
destination: user-regions
contentType: application/avro
consumer:
useNativeDecoding: true
spring.cloud.stream.bindings.output:
destination: output-topic
producer:
useNativeEncoding: true
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
spring.cloud.stream.kafka.streams.bindings.inputTable:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.output:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
brokers: localhost
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

View File

@@ -1,4 +1,3 @@
spring.cloud.stream.bindings.singleOutput.contentType: application/json
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
binder.configuration:

View File

@@ -1,4 +1,3 @@
spring.cloud.stream.bindings.output.contentType: application/json
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
binder.configuration: