New recipe for Kafka custom header mapper
This commit is contained in:
114
recipes/recipe-12-custom-kafka-header-mapper.adoc
Normal file
114
recipes/recipe-12-custom-kafka-header-mapper.adoc
Normal file
@@ -0,0 +1,114 @@
|
||||
# Adding custom header mapper in Kafka
|
||||
|
||||
## Problem Statement
|
||||
|
||||
I have a Kafka producer application that sets some headers, but they are missing in the consumer application. Why is that?
|
||||
|
||||
## Solution
|
||||
|
||||
Under normal circumstances, this should be fine.
|
||||
|
||||
Imagine, you have the following producer.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Supplier<Message<String>> supply() {
|
||||
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
|
||||
}
|
||||
```
|
||||
|
||||
On the consumer side, you should still see the header "foo", and the following should not give you any issues.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Consumer<Message<String>> consume() {
|
||||
return s -> {
|
||||
final String foo = (String)s.getHeaders().get("foo");
|
||||
System.out.println(foo);
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
If you provide a https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties[custom header mapper] in the application, then this won't work.
|
||||
Let's say you have an empty `KafkaHeaderMapper` in the application.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
|
||||
return new KafkaHeaderMapper() {
|
||||
@Override
|
||||
public void fromHeaders(MessageHeaders headers, Headers target) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toHeaders(Headers source, Map<String, Object> target) {
|
||||
|
||||
}
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
If that is your implementation, then you will miss the `foo` header on the consumer.
|
||||
Chances are that, you may have some logic inside those `KafkaHeaderMapper` methods.
|
||||
You need the following to populate the `foo` header.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
|
||||
return new KafkaHeaderMapper() {
|
||||
@Override
|
||||
public void fromHeaders(MessageHeaders headers, Headers target) {
|
||||
final String foo = (String) headers.get("foo");
|
||||
target.add("foo", foo.getBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toHeaders(Headers source, Map<String, Object> target) {
|
||||
final Header foo = source.lastHeader("foo");
|
||||
target.put("foo", new String(foo.value()));
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
That will properly populate the `foo` header from the producer to consumer.
|
||||
|
||||
## Special note on the id header
|
||||
|
||||
In Spring Cloud Stream, the `id` header is a special header, but some applications may want to have special custom id headers - something like `custom-id` or `ID` or `Id`.
|
||||
The first one (`custom-id`) will propagate without any custom header mapper from producer to consumer.
|
||||
However, if you produce with a variant of the framework reserved `id` header - such as `ID`, `Id`, `iD` etc. then you will run into issues with the internals of the framework.
|
||||
See this StackOverflow thread fore more context on this use case: https://stackoverflow.com/questions/68412600/change-the-behaviour-in-spring-cloud-stream-make-header-matcher-case-sensitive
|
||||
In that case, you must use a custom `KafkaHeaderMapper` to map the case-sensitive id header.
|
||||
For example, let's say you have the following producer.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Supplier<Message<String>> supply() {
|
||||
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
|
||||
}
|
||||
```
|
||||
|
||||
The header `Id` above will be gone from the consuming side as it clashes with the framework `id` header.
|
||||
You can provide a custom `KafkaHeaderMapper` to solve this issue.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
|
||||
return new KafkaHeaderMapper() {
|
||||
@Override
|
||||
public void fromHeaders(MessageHeaders headers, Headers target) {
|
||||
final String myId = (String) headers.get("Id");
|
||||
target.add("Id", myId.getBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toHeaders(Headers source, Map<String, Object> target) {
|
||||
final Header Id = source.lastHeader("Id");
|
||||
target.put("Id", new String(Id.value()));
|
||||
}
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
By doing this, both `id` and `Id` headers will be available from the producer to the consumer side.
|
||||
Reference in New Issue
Block a user