Files
spring-cloud-stream-samples/kafka-streams-samples/kafka-streams-dlq-sample
Soby Chacko 24997117d4 Spring Boot and Cloud updates
Update samples to use Boot 2.4.4
Spring Cloud is updated to 2020.0.2

Remove empty integration tests
2021-03-24 22:47:17 -04:00
..
2018-03-09 10:47:22 -05:00
2019-10-28 17:03:15 -04:00
2018-03-09 10:47:22 -05:00
2018-03-09 10:47:22 -05:00
2019-03-21 15:21:19 -04:00
2019-03-21 15:21:19 -04:00
2021-03-24 22:47:17 -04:00
2019-10-28 17:03:15 -04:00

== What is this app?

This is an example of a Spring Cloud Stream processor using Kafka Streams support.

This is a demonstration of deserialization errors and DLQ in Kafka Streams binder.

The example is based on the word count application from the https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java[reference documentation].
It uses a single input and a single output.
In essence, the application receives text messages from an input topic and computes word occurrence counts in a configurable time window and report that in an output topic.
This sample uses lambda expressions and thus requires Java 8+.

=== Running the app:

`docker-compose up -d`

Go to the root of the repository and do: `./mvnw clean package`

`java -jar target/kafka-streams-dlq-sample-0.0.1-SNAPSHOT.jar`

The default application.yml file demonstrates native decoding by Kafka.
The default value serializer is set to IntegerSerde to force a deserialization errors.

Issue the following commands:

`docker exec -it kafka-dlq /opt/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic words`

On another terminal:

`docker exec -it kafka-dlq /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic counts`

On another terminal:

`docker exec -it kafka-dlq /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic words-count-dlq`

On the console producer, enter some text data.
You will see that the messages produce deserialization errors and end up in the DLQ topic - words-count-dlq.
You will not see any messages coming to the regular destination counts.

In the application's configuration we set the value `Serde` as `Integer` and this is the reason why deserialization errors occur.
The application expects numerical data, but the producer sends regular text.