diff --git a/recipes/recipe-12-custom-kafka-header-mapper.adoc b/recipes/recipe-12-custom-kafka-header-mapper.adoc new file mode 100644 index 0000000..c58c8cb --- /dev/null +++ b/recipes/recipe-12-custom-kafka-header-mapper.adoc @@ -0,0 +1,114 @@ +# Adding custom header mapper in Kafka + +## Problem Statement + +I have a Kafka producer application that sets some headers, but they are missing in the consumer application. Why is that? + +## Solution + +Under normal circumstances, this should be fine. + +Imagine, you have the following producer. + +``` +@Bean +public Supplier> supply() { + return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build(); +} +``` + +On the consumer side, you should still see the header "foo", and the following should not give you any issues. + +``` +@Bean +public Consumer> consume() { + return s -> { + final String foo = (String)s.getHeaders().get("foo"); + System.out.println(foo); + }; +} +``` + +If you provide a https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties[custom header mapper] in the application, then this won't work. +Let's say you have an empty `KafkaHeaderMapper` in the application. + +``` +@Bean +public KafkaHeaderMapper kafkaBinderHeaderMapper() { + return new KafkaHeaderMapper() { + @Override + public void fromHeaders(MessageHeaders headers, Headers target) { + + } + + @Override + public void toHeaders(Headers source, Map target) { + + } + }; +} +``` + +If that is your implementation, then you will miss the `foo` header on the consumer. +Chances are that, you may have some logic inside those `KafkaHeaderMapper` methods. +You need the following to populate the `foo` header. + +``` +@Bean +public KafkaHeaderMapper kafkaBinderHeaderMapper() { + return new KafkaHeaderMapper() { + @Override + public void fromHeaders(MessageHeaders headers, Headers target) { + final String foo = (String) headers.get("foo"); + target.add("foo", foo.getBytes()); + } + + @Override + public void toHeaders(Headers source, Map target) { + final Header foo = source.lastHeader("foo"); + target.put("foo", new String(foo.value())); + } + } +``` + +That will properly populate the `foo` header from the producer to consumer. + +## Special note on the id header + +In Spring Cloud Stream, the `id` header is a special header, but some applications may want to have special custom id headers - something like `custom-id` or `ID` or `Id`. +The first one (`custom-id`) will propagate without any custom header mapper from producer to consumer. +However, if you produce with a variant of the framework reserved `id` header - such as `ID`, `Id`, `iD` etc. then you will run into issues with the internals of the framework. +See this StackOverflow thread fore more context on this use case: https://stackoverflow.com/questions/68412600/change-the-behaviour-in-spring-cloud-stream-make-header-matcher-case-sensitive +In that case, you must use a custom `KafkaHeaderMapper` to map the case-sensitive id header. +For example, let's say you have the following producer. + +``` +@Bean +public Supplier> supply() { + return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build(); +} +``` + +The header `Id` above will be gone from the consuming side as it clashes with the framework `id` header. +You can provide a custom `KafkaHeaderMapper` to solve this issue. + +``` +@Bean +public KafkaHeaderMapper kafkaBinderHeaderMapper1() { + return new KafkaHeaderMapper() { + @Override + public void fromHeaders(MessageHeaders headers, Headers target) { + final String myId = (String) headers.get("Id"); + target.add("Id", myId.getBytes()); + } + + @Override + public void toHeaders(Headers source, Map target) { + final Header Id = source.lastHeader("Id"); + target.put("Id", new String(Id.value())); + } + }; +} +``` + +By doing this, both `id` and `Id` headers will be available from the producer to the consumer side. \ No newline at end of file