39 lines
1.8 KiB
Plaintext
39 lines
1.8 KiB
Plaintext
== What is this app?
|
|
|
|
This is an example of a Spring Cloud Stream processor using Kafka Streams support.
|
|
|
|
The example is based on the word count application from the https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java[reference documentation].
|
|
The application has two inputs one as a KStream and another one with the binding for a message channel.
|
|
|
|
=== Running the app:
|
|
|
|
Go to the root of the repository and do:
|
|
|
|
`docker-compose up -d`
|
|
|
|
`./mvnw clean package`
|
|
|
|
`java -jar target/kafka-streams-message-channel-0.0.1-SNAPSHOT.jar`
|
|
|
|
Issue the following commands:
|
|
|
|
`docker exec -it kafka-mc /opt/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic words`
|
|
|
|
On another terminal:
|
|
|
|
`docker exec -it kafka-mc /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic counts`
|
|
|
|
Enter some text in the console producer and watch the output in the console consumer.
|
|
|
|
Also watch the console for logging statements from the regular sink StreamListener method.
|
|
|
|
The default time window is configured for 5 seconds and you can change that using the following property.
|
|
|
|
`spring.cloud.stream.kafka.streams.timeWindow.length` (value is expressed in milliseconds)
|
|
|
|
In order to switch to a hopping window, you can use the `spring.cloud.stream.kafka.streams.timeWindow.advanceBy` (value in milliseconds).
|
|
This will create an overlapped hopping windows depending on the value you provide.
|
|
|
|
Here is an example with 2 overlapping windows (window length of 10 seconds and a hop (advance) by 5 seconds:
|
|
|
|
`java -jar target/kafka-streams-message-channel-0.0.1-SNAPSHOT.jar --spring.cloud.stream.kafka.streams.timeWindow.length=10000 --spring.cloud.stream.kafka.streams.timeWindow.advanceBy=5000` |