Files
Soby Chacko 24997117d4 Spring Boot and Cloud updates
Update samples to use Boot 2.4.4
Spring Cloud is updated to 2020.0.2

Remove empty integration tests
2021-03-24 22:47:17 -04:00
..
2018-08-23 17:02:42 -04:00
2019-03-21 15:21:19 -04:00
2019-03-21 15:21:19 -04:00
2019-03-21 15:19:50 -04:00
2018-08-23 17:20:57 -04:00

== Spring Cloud Stream, Kafka Streams and Schema Evolution in Action with Confluent Schema Registry Server!

This repo includes three Spring Boot applications to demonstrate Schema Evolution using Spring Cloud Stream Kafka and Kafka Streams binders support.
Producer V1 (`custom-confluent-producer1`), Producer V2 (`custom-confluent-producer2`), and Consumer (`kafka-streams-confluent-consumer`) are included in this project.

Both producers are standard spring cloud stream sources that use the kafka binder.
They both skip the framework provided serialization on the outbound and relies on Kafka's native serialization mechanism.
The consumer app uses the kafka streams binder and also skips the binder provided message deserialization. Instead, it delegates those responsibilities to native Kafka Streams.

The samples do *not* use the schema registry support provided by Spring Cloud Stream, but rather they use the Confluent Schema Registry.
Both producers and the consumer use the same exact serializer and deserializer respectively.
Producers use the `SpecificAvroSerializer` and the consumer uses the `SpecificAvroSerde`. Please see the implementations for more details.

=== Running the application

Make sure you are in the directory `kafka-streams-schema-evolution`

Build the project: `./mvnw clean package`

Ensure that you have Kafka running locally.

- Start the Confluent Schema Registry server in a terminal window session
[source,bash]
----
<ROOT_OF_CONFLUENT_PLATFORM_INSTALLATION>/bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
----

In order to run this sample, you need to set compatibility to `NONE` on Confluent schema registry server.

`curl -X PUT http://127.0.0.1:8081/config -d '{"compatibility": "NONE"}' -H "Content-Type:application/json"`

- Start `kafka-streams-confluent-consumer` on another terminal session
[source,bash]
----
java -jar kafka-streams-confluent-consumer/target/kafka-streams-confluent-consumer-0.0.1-SNAPSHOT.jar
----
- Start `custom-confluent-producer1` on another terminal session
[source,bash]
----
java -jar custom-confluent-producer1/target/custom-confluent-producer1-0.0.1-SNAPSHOT.jar
----
- Start `custom-confluent-producer2` on another terminal session
[source,bash]
----
java -jar custom-confluent-producer2/target/custom-confluent-producer2-0.0.1-SNAPSHOT.jar
----

=== Sample Data
Both the producers in the demonstration are _also_ REST controllers. We will hit the `/messages` endpoint on each producer
to POST sample data.

_Example:_
[source,bash]
----
curl -X POST http://localhost:9009/messages
curl -X POST http://localhost:9010/messages
curl -X POST http://localhost:9009/messages
curl -X POST http://localhost:9009/messages
curl -X POST http://localhost:9010/messages
----

=== Output
The consumer application has a printer that runs every 30 seconds to print the current counts.

[source,bash,options=nowrap,subs=attributes]
----
2018-07-31 16:57:37.041  INFO 29393 --- [ask-scheduler-3] ication$$EnhancerBySpringCGLIB$$27af0d3a : Count for v1 is=10
2018-07-31 16:57:37.041  INFO 29393 --- [ask-scheduler-3] ication$$EnhancerBySpringCGLIB$$27af0d3a : Count for v2 is=12
2018-07-31 16:58:07.037  INFO 29393 --- [ask-scheduler-3] ication$$EnhancerBySpringCGLIB$$27af0d3a : Count for v1 is=10
2018-07-31 16:58:07.037  INFO 29393 --- [ask-scheduler-3] ication$$EnhancerBySpringCGLIB$$27af0d3a : Count for v2 is=12
----

NOTE: Refer to the payload suffix in the `id` field. Each of them are appended with `-v1` or `-v2` indicating they are from
`producer1` and `producer2` respectively.

=== What just happened?
The schema evolved on the `temperature` field. That field is now split into `internalTemperature` and `externalTemperature`,
as two separate fields. The `producer1` produces payload only with `temperature` and on the other hand, `producer2` produces
payload with `internalTemperature` and `externalTemperature` fields in it.

The `consumer` is coded against a base schema that include the split fields.

The `consumer` app can happily deserialize the payload with `internalTemperature` and `externalTemperature` fields. However, when
a `producer1` payload arrives (which includes `temperature` field), the schema evolution and compatibility check are automatically
applied.

Because each payload also includes the payload version, the applications co-ordinate with the Confluent Schema Registry server and thus schema evolution occurs behind the scenes. The automatic mapping of `temperature` to
`internalTemperature` field is applied, since that's the field where the `aliases` is defined.