Migrate the recipe section in Spring Cloud Stream Samples repository as Tips, Tricks and Receipes in Kafka binder main docs. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1173 Resolves #1174
866 lines
34 KiB
Plaintext
866 lines
34 KiB
Plaintext
== Tips, Tricks and Recipes
|
|
|
|
=== Simple DLQ with Kafka
|
|
|
|
==== Problem Statement
|
|
|
|
As a developer, I want to write a consumer application that processes records from a Kafka topic.
|
|
However, if some error occurs in processing, I don't want the application to stop completely.
|
|
Instead, I want to send the record in error to a DLT (Dead-Letter-Topic) and then continue processing new records.
|
|
|
|
==== Solution
|
|
|
|
The solution for this problem is to use the DLQ feature in Spring Cloud Stream.
|
|
For the purposes of this discussion, let us assume that the following is our processor function.
|
|
|
|
```
|
|
@Bean
|
|
public Consumer<byte[]> processData() {
|
|
return s -> {
|
|
throw new RuntimeException();
|
|
};
|
|
```
|
|
|
|
This is a very trivial function that throws an exception for all the records that it processes, but you can take this function and extend it to any other similar situations.
|
|
|
|
In order to send the records in error to a DLT, we need to provide the following configuration.
|
|
|
|
```
|
|
spring.cloud.stream:
|
|
bindings:
|
|
processData-in-0:
|
|
group: my-group
|
|
destination: input-topic
|
|
kafka:
|
|
bindings:
|
|
processData-in-0:
|
|
consumer:
|
|
enableDlq: true
|
|
dlqName: input-topic-dlq
|
|
```
|
|
|
|
In order to activate DLQ, the application must provide a group name.
|
|
Anonymous consumers cannot use the DLQ facilities.
|
|
We also need to enable DLQ by setting the `enableDLQ` property on the Kafka consumer binding to `true`.
|
|
Finally, we can optionally provide the DLT name by providing the `dlqName` on Kafka consumer binding, which otherwise default to `input-topic-dlq.my-group.error` in this case.
|
|
|
|
Note that in the example consumer provided above, the type of the payload is `byte[]`.
|
|
By default, the DLQ producer in Kafka binder expects the payload of type `byte[]`.
|
|
If that is not the case, then we need to provide the configuration for proper serializer.
|
|
For example, let us re-write the consumer function as below:
|
|
|
|
```
|
|
@Bean
|
|
public Consumer<String> processData() {
|
|
return s -> {
|
|
throw new RuntimeException();
|
|
};
|
|
}
|
|
```
|
|
|
|
Now, we need to tell Spring Cloud Stream, how we want to serialize the data when writing to the DLT.
|
|
Here is the modified configuration for this scenario:
|
|
|
|
```
|
|
spring.cloud.stream:
|
|
bindings:
|
|
processData-in-0:
|
|
group: my-group
|
|
destination: input-topic
|
|
kafka:
|
|
bindings:
|
|
processData-in-0:
|
|
consumer:
|
|
enableDlq: true
|
|
dlqName: input-topic-dlq
|
|
dlqProducerProperties:
|
|
configuration:
|
|
value.serializer: org.apache.kafka.common.serialization.StringSerializer
|
|
|
|
```
|
|
|
|
=== DLQ with Advanced Retry Options
|
|
|
|
==== Problem Statement
|
|
|
|
This is similar to the recipe above, but as a developer I would like to configure the way retries are handled.
|
|
|
|
==== Solution
|
|
|
|
If you followed the above recipe, then you get the default retry options built into the Kafka binder when the processing encounters an error.
|
|
|
|
By default, the binder retires for a maximum of 3 attempts with a one second initial delay, 2.0 multiplier with each back off with a max delay of 10 seconds.
|
|
You can change all these configurations as below:
|
|
|
|
```
|
|
spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
|
|
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
|
|
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
|
|
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
|
|
```
|
|
|
|
If you want, you can also provide a list of retryable exceptions by providing a map of boolean values.
|
|
For example,
|
|
|
|
```
|
|
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
|
|
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
|
|
```
|
|
|
|
By default, any exceptions not listed in the map above will be retried.
|
|
If that is not desired, then you can disable that by providing,
|
|
|
|
```
|
|
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
|
|
```
|
|
|
|
You can also provide your own `RetryTemplate` and mark it as `@StreamRetryTemplate` which will be scanned and used by the binder.
|
|
This is useful when you want more sophisticated retry strategies and policies.
|
|
|
|
If you have multiple `@StreamRetryTemplate` beans, then you can specify which one your binding wants by using the property,
|
|
|
|
```
|
|
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
|
|
```
|
|
|
|
=== Handling Deserialization errors with DLQ
|
|
|
|
==== Problem Statement
|
|
|
|
I have a processor that encounters a deserilzartion exception in Kafka consumer.
|
|
I would expect that the Spring Cloud Stream DLQ mechanism will catch that scenario, but it does not.
|
|
How can I handle this?
|
|
|
|
==== Solution
|
|
|
|
The normal DLQ mechanism offered by Spring Cloud Stream will not help when Kafka consumer throws an irrecoverable deserialization excepion.
|
|
This is because, this exception happens even before the consumer's `poll()` method returns.
|
|
Spring for Apache Kafka project offers some great ways to help the binder with this situation.
|
|
Let us explore those.
|
|
|
|
Assuming this is our function:
|
|
|
|
```
|
|
@Bean
|
|
public Consumer<String> functionName() {
|
|
return s -> {
|
|
System.out.println(s);
|
|
};
|
|
}
|
|
```
|
|
|
|
It is a trivial function that takes a `String` parameter.
|
|
|
|
We want to bypass the message converters provided by Spring Cloud Stream and want to use native deserializers instead.
|
|
In the case of `String` types, it does not make much sense, but for more complex types like AVRO etc. you have to rely on external deserializers and therefore want to delegate the conversion to Kafka.
|
|
|
|
Now when the consumer receives the data, let us assume that there is a bad record that causes a deserilziation errror, maybe someone passed an `Integer` instead of a `String` for example.
|
|
In that case, if you don't do something in the application, the excption will be propagated through the chain and your application will exit eventually.
|
|
|
|
In order to handle this, you can add a `ListenerContainerCustomizer` `@Bean` that configures a `SeekToCurrentErrorHandler`.
|
|
This `SeekToCurrentErrorHandler` is configured with a `DeadLetterPublishingRecoverer`.
|
|
We also need to configure an `ErrorHandlingDeserializer` for the consumer.
|
|
That sounds like a lot of complex things, but in reality, it boils down to these 3 beans in this case.
|
|
|
|
```
|
|
@Bean
|
|
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
|
|
return (container, dest, group) -> {
|
|
container.setErrorHandler(errorHandler);
|
|
};
|
|
}
|
|
```
|
|
|
|
```
|
|
@Bean
|
|
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
|
|
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
|
|
}
|
|
```
|
|
|
|
```
|
|
@Bean
|
|
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
|
|
return new DeadLetterPublishingRecoverer(bytesTemplate);
|
|
}
|
|
```
|
|
|
|
Let us analyze each of them.
|
|
The first one is the `ListenerContainerCustomizer` bean that takes a `SeekToCurrentErrorHandler`.
|
|
The container is now customized with that particular error handler.
|
|
You can learn more about container customization https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_advanced_consumer_configuration[here].
|
|
|
|
The second bean is the `SeekToCurrentErrorHandler` that is configured with a publishing to a `DLT`.
|
|
See https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current[here] for more details on `SeekToCurrentErrorHandler`.
|
|
|
|
The third bean is the `DeadLetterPublishingRecoverer` that is ultimately responsible for sending to the `DLT`.
|
|
By default, the `DLT` topic is named as the ORIGINAL_TOPIC_NAME.DLT.
|
|
You can change that though.
|
|
See the https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters[docs] for more details.
|
|
|
|
|
|
We also need to configure an https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer[ErrorHandlingDeserializer] through application config.
|
|
|
|
The `ErrorHandlingDeserializer` delegates to the actual deserializer.
|
|
In case of errors, it sets key/value of the record to be null and includes the raw bytes of the message.
|
|
It then sets the exception in a header and passes this record to the listener, which then calls the registered error handler.
|
|
|
|
Following is the configuration required:
|
|
|
|
```
|
|
spring.cloud.stream:
|
|
function:
|
|
definition: functionName
|
|
bindings:
|
|
functionName-in-0:
|
|
group: group-name
|
|
destination: input-topic
|
|
consumer:
|
|
use-native-decoding: true
|
|
kafka:
|
|
bindings:
|
|
functionName-in-0:
|
|
consumer:
|
|
enableDlq: true
|
|
dlqName: dlq-topic
|
|
dlqProducerProperties:
|
|
configuration:
|
|
value.serializer: org.apache.kafka.common.serialization.StringSerializer
|
|
configuration:
|
|
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
|
|
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
|
|
```
|
|
|
|
We are providing the `ErrorHandlingDeserializer` through the `configuration` property on the binding.
|
|
We are also indicating that the actual deserializer to delegate is the `StringDeserializer`.
|
|
|
|
Keep in mind that none of the dlq properties above are relevant for the discussions in this recipe.
|
|
They are purely meant for addressing any application level errors only.
|
|
|
|
=== Basic offset management in Kafka binder
|
|
|
|
==== Problem Statement
|
|
|
|
I want to write a Spring Cloud Stream Kafka consumer applicaiton and not sure about how it manages Kafka consumer offsets.
|
|
Can you exaplain?
|
|
|
|
==== Solution
|
|
|
|
We encourage you read the https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#reset-offsets[docs] section on this to get a thorough understanding on it.
|
|
|
|
Here is it in a gist:
|
|
|
|
Kafka supports two types of offsets to start with by default - `earliest` and `latest`.
|
|
Their semantics are self-explanatory from their names.
|
|
|
|
Assuming you are running the consumer for the first time.
|
|
If you miss the group.id in your Spring Cloud Stream application, then it becomes an anonymous consumer.
|
|
Whenever, you have an anonymous consumer, in that case, Spring Cloud Stream application by default will start from the `latest` available offset in the topic partition.
|
|
On the other hand, if you explicitly specify a group.id, then by default, the Spring Cloud Stream application will start from the `earliest` available offset in the topic partiton.
|
|
|
|
In both cases above (consumers with explicit groups and anonymous groups), the starting offset can be switched around by using the property `spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset` and setting it to either `earliest` or `latest`.
|
|
|
|
Now, assume that you already ran the consumer before and now starting it again.
|
|
In this case, the starting offset semantics in the above case do not apply as the consumer finds an already committed offset for the consumer group (In the case of an anonymous consumer, although the application does not provide a group.id, the binder will auto generate one for you).
|
|
It simply picks up from the last committed offset onward.
|
|
This is true, even when you have a `startOffset` value provided.
|
|
|
|
However, you can override the default behavior where the consumer starts from the last committed offset by using the `resetOffsets` property.
|
|
In order to do that, set the property `spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets` to `true` (which is `false` by default).
|
|
Then make sure you provide the `startOffset` value (either `earliest` or `latest`).
|
|
When you do that and then start the consumer application, each time you start, it starts as if this is starting for the first time and ignore any committed offsets for the partition.
|
|
|
|
=== Seeking to arbitrary offsets in Kafka
|
|
|
|
==== Problem Statement
|
|
|
|
Using Kafka binder, I know that it can set the offset to either `earliest` or `latest`, but I have a requirement to seek the offset to something in the middle, an arbitrary offset.
|
|
Is there a way to achieve this using Spring Cloud Stream Kafka biner?
|
|
|
|
==== Solution
|
|
|
|
Previously we saw how Kafka binder allows you to tackle basic offset management.
|
|
By default, the binder does not allow you to rewind to an arbitrary offset, at least through the mechanism we saw in that reipce.
|
|
However, there are some low-level strategies that the binder provides to achieve this use case.
|
|
Let's explore them.
|
|
|
|
First of all, when you want to reset to an arbitrary offset other than `earliest` or `latest`, make sure to leave the `resetOffsets` configuration to its defaults, which is `false`.
|
|
Then you have to provide a custom bean of type `KafkaBindingRebalanceListener`, which will be injected into all consumer bindings.
|
|
It is an interface that comes with a few default methods, but here is the method that we are interested in:
|
|
|
|
```
|
|
/**
|
|
* Invoked when partitions are initially assigned or after a rebalance. Applications
|
|
* might only want to perform seek operations on an initial assignment. While the
|
|
* 'initial' argument is true for each thread (when concurrency is greater than 1),
|
|
* implementations should keep track of exactly which partitions have been sought.
|
|
* There is a race in that a rebalance could occur during startup and so a topic/
|
|
* partition that has been sought on one thread may be re-assigned to another
|
|
* thread and you may not wish to re-seek it at that time.
|
|
* @param bindingName the name of the binding.
|
|
* @param consumer the consumer.
|
|
* @param partitions the partitions.
|
|
* @param initial true if this is the initial assignment on the current thread.
|
|
*/
|
|
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
|
|
Collection<TopicPartition> partitions, boolean initial) {
|
|
// do nothing
|
|
}
|
|
```
|
|
|
|
Let us look at the details.
|
|
|
|
In essence, this method will be invoked each time during the initial assignment for a topic partition or after a rebalance.
|
|
For better illustration, let us assume that our topic is `foo` and it has 4 partitions.
|
|
Initially, we are only starting a single consumer in the group and this consumer will consume from all partitions.
|
|
When the consumer starts for the first time, all 4 partitions are getting initially assigned.
|
|
However, we do not want to start the partitions to consume at the defaults (`earliest` since we define a group), rather for each partition, we want them to consume after seeking to arbitrary offsets.
|
|
Imagine that you have a business case to consume from certain offsets as below.
|
|
|
|
```
|
|
Partition start offset
|
|
|
|
0 1000
|
|
1 2000
|
|
2 2000
|
|
3 1000
|
|
```
|
|
|
|
This could be achieved by implementing the above method as below.
|
|
|
|
```
|
|
|
|
@Override
|
|
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
|
|
|
|
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
|
|
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
|
|
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
|
|
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
|
|
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
|
|
|
|
if (initial) {
|
|
partitions.forEach(tp -> {
|
|
if (topicPartitionOffset.containsKey(tp)) {
|
|
final Long offset = topicPartitionOffset.get(tp);
|
|
try {
|
|
consumer.seek(tp, offset);
|
|
}
|
|
catch (Exception e) {
|
|
// Handle excpetions carefully.
|
|
}
|
|
}
|
|
});
|
|
}
|
|
}
|
|
```
|
|
|
|
This is just a rudimentary implementation.
|
|
Real world use cases are much more complex than this and you need to adjust accordingly, but this certainly gives you a basic sketch.
|
|
When consumer `seek` fails, it may throw some runtime exceptions and you need to decide what to do in those cases.
|
|
|
|
==== What if we start a second consumer with the same group id?
|
|
|
|
When we add a second consumer, a rebalance will occur and some partitions will be moved around.
|
|
Let's say that the new consumer gets partitions `2` and `3`.
|
|
When this new Spring Cloud Stream consumer calls this `onPartitionsAssigned` method, it will see that this is the initial assignment for partititon `2` and `3` on this consumer.
|
|
Therefore, it will do the seek operation becuase of the conditional check on the `initial` argument.
|
|
In the case of the first consumer, it now only has partitons `0` and `1`
|
|
However, for this consumer it was simply a rebalance event and not considered as an intial assignment.
|
|
Thus, it will not re-seek to the given offsets because of the conditional check on the `initial` argument.
|
|
|
|
=== How do I manually acknowledge using Kafka binder?
|
|
|
|
==== Problem Statement
|
|
|
|
Using Kafka binder, I want to manually acknowledge messages in my consumer.
|
|
How do I do that?
|
|
|
|
==== Solution
|
|
|
|
By default, Kafka binder delegates to the default commit settings in Spring for Apache Kafka project.
|
|
The default `ackMode` in Spring Kafka is `batch`.
|
|
See https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets[here] for more details on that.
|
|
|
|
There are situations in which you want to disable this default commit behavior and rely on manual commits.
|
|
Following steps allow you to do that.
|
|
|
|
Set the property `spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode` to either `MANUAL` or `MANUAL_IMMEDIATE`.
|
|
When it is set like that, then there will be a header called `kafka_acknowledgment` (from `KafkaHeaders.ACKNOWLEDGMENT`) present in the message received by the consumer method.
|
|
|
|
For example, imagine this as your consumer method.
|
|
|
|
```
|
|
@Bean
|
|
public Consumer<Message<String>> myConsumer() {
|
|
return msg -> {
|
|
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
|
|
if (acknowledgment != null) {
|
|
System.out.println("Acknowledgment provided");
|
|
acknowledgment.acknowledge();
|
|
}
|
|
};
|
|
}
|
|
```
|
|
|
|
Then you set the property `spring.cloud.stream.bindings.myConsumer-in-0.consumer.ackMode` to `MANUAL` or `MANUAL_IMMEDIATE`.
|
|
|
|
=== How do I override the default binding names in Spring Cloud Stream?
|
|
|
|
==== Problem Statement
|
|
|
|
Spring Cloud Stream creates default bindings based on the function definition and signature, but how do I override these to more domain friendly names?
|
|
|
|
==== Solution
|
|
|
|
Assume that following is your function signature.
|
|
|
|
```
|
|
@Bean
|
|
public Function<String, String> uppercase(){
|
|
...
|
|
}
|
|
```
|
|
|
|
By default, Spring Cloud Stream will create the bindings as below.
|
|
|
|
1. uppercase-in-0
|
|
2. uppercase-out-0
|
|
|
|
You can override these bindings to something by using the following properties.
|
|
|
|
```
|
|
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
|
|
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
|
|
```
|
|
|
|
After this, all binding properties must be made on the new names, `my-transformer-in` and `my-transformer-out`.
|
|
|
|
Here is another example with Kafka Streams and multiple inputs.
|
|
|
|
```
|
|
@Bean
|
|
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
|
|
...
|
|
}
|
|
```
|
|
|
|
By default, Spring Cloud Stream will create three different binding names for this function.
|
|
|
|
1. processOrder-in-0
|
|
2. processOrder-in-1
|
|
3. processOrder-out-0
|
|
|
|
You have to use these binding names each time you want to set some configuration on these bindings.
|
|
You don't like that, and you want to use more domain-friendly and readable binding names, for example, something like.
|
|
|
|
1. orders
|
|
2. accounts
|
|
3. enrichedOrders
|
|
|
|
You can easily do that by simply setting these three properties
|
|
|
|
1. spring.cloud.stream.function.bindings.processOrder-in-0=orders
|
|
2. spring.cloud.stream.function.bindings.processOrder-in-1=accounts
|
|
3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
|
|
|
|
Once you do that, it overrides the default binding names and any properties that you want to set on them must be on these new binding names.
|
|
|
|
=== How do I send a message key as part of my record?
|
|
|
|
==== Problem Statement
|
|
|
|
I need to send a key along with the payload of the record, is there a way to do that in Spring Cloud Stream?
|
|
|
|
==== Solution
|
|
|
|
It is often necessary that you want to send associative data structure like a map as the record with a key and value.
|
|
Spring Cloud Stream allows you to do that in a straightforward manner.
|
|
Following is a basic blueprint for doing this, but you may want to adapt it to your paricular use case.
|
|
|
|
Here is sample producer method (aka `Supplier`).
|
|
|
|
```
|
|
@Bean
|
|
public Supplier<Message<String>> supplier() {
|
|
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
|
|
}
|
|
```
|
|
|
|
This is a trivial function that sends a message with a `String` payload, but also with a key.
|
|
Note that we set the key as a message header using `KafkaHeaders.MESSAGE_KEY`.
|
|
|
|
If you want to change the key from the default `kafka_messageKey`, then in the configuration, we need to specify this property:
|
|
|
|
```
|
|
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
|
|
```
|
|
|
|
Please note that we use the binding name `supplier-out-0` since that is our function name, please update accordingly.
|
|
|
|
Then, we use this new key when we produce the message.
|
|
|
|
=== How do I use native serializer and deserializer instead of message conversion done by Spring Cloud Stream?
|
|
|
|
==== Problem Statement
|
|
|
|
Instead of using the message converters in Spring Cloud Stream, I want to use native Serializer and Deserializer in Kafka.
|
|
By default, Spring Cloud Stream takes care of this conversion using its internal built-in message converters.
|
|
How can I bypass this and delegate the responsibility to Kafka?
|
|
|
|
==== Solution
|
|
|
|
This is really easy to do.
|
|
|
|
All you have to do is to provide the following property to enable native serialization.
|
|
|
|
```
|
|
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
|
|
```
|
|
|
|
Then, you need to also set the serailzers.
|
|
There are a couple of ways to do this.
|
|
|
|
```
|
|
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configurarion.key.serializer: org.apache.kafka.common.serialization.StringSerializer
|
|
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configurarion.value.serializer: org.apache.kafka.common.serialization.StringSerializer
|
|
```
|
|
|
|
or using the binder configuration.
|
|
|
|
```
|
|
spring.cloud.stream.kafka.binder.configurarion.key.serializer: org.apache.kafka.common.serialization.StringSerializer
|
|
spring.cloud.stream.kafka.binder.configurarion.value.serializer: org.apache.kafka.common.serialization.StringSerializer
|
|
```
|
|
|
|
When using the binder way, it is applied against all the bindings whereas setting them at the bindings are per binding.
|
|
|
|
On the deserializing side, you just need to provide the deserializers as configuration.
|
|
|
|
For example,
|
|
|
|
```
|
|
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configurarion.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
|
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configurarion.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
|
```
|
|
|
|
You can also set them at the binder level.
|
|
|
|
There is an optional property that you can set to force native decoding.
|
|
|
|
```
|
|
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
|
|
```
|
|
|
|
However, in the case of Kafka binder, this is unncessary, as by the time it reaches the binder, Kafka already deserializes them using the configured deserializers.
|
|
|
|
=== Explain how offset resetting work in Kafka Streams binder
|
|
|
|
==== Problem Statement
|
|
|
|
By default, Kafka Streams binder always starts from the earliest offset for a new consumer.
|
|
Sometimes, it is beneficial or required by the application to start from the latest offset.
|
|
Kafka Streams binder allows you to do that.
|
|
|
|
==== Solution
|
|
|
|
Before we look at the solution, let us look at the following scenario.
|
|
|
|
```
|
|
@Bean
|
|
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
|
|
(s, t) -> s.join(t, ...)
|
|
...
|
|
}
|
|
```
|
|
|
|
We have a `BiConsumer` bean that requires two input bindings.
|
|
In this case, the first binding is for a `KStream` and the second one is for a `KTable`.
|
|
When running this application for the first time, by default, both bindings start from the `earliest` offset.
|
|
What about I want to start from the `latest` offset due to some requirements?
|
|
You can do this by enabling the following properties.
|
|
|
|
```
|
|
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
|
|
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
|
|
```
|
|
|
|
If you want only one binding to start from the `latest` offset and the other to consumer from the default `earliest`, then leave the latter binding out from the configuration.
|
|
|
|
Keep in mind that, once there are committed offsets, these setting are *not* honored and the committed offsets take precedence.
|
|
|
|
=== Keeping track of successful sending of records (producing) in Kafka
|
|
|
|
==== Problem Statement
|
|
|
|
I have a Kafka producer application and I want to keep track of all my successful sedings.
|
|
|
|
==== Solution
|
|
|
|
Let us assume that we have this following supplier in the application.
|
|
|
|
```
|
|
@Bean
|
|
public Supplier<Message<String>> supplier() {
|
|
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
|
|
}
|
|
```
|
|
|
|
Then, we need to define a new `MessageChannel` bean to capture all the successful send information.
|
|
|
|
```
|
|
@Bean
|
|
public MessageChannel fooRecordChannel() {
|
|
return new DirectChannel();
|
|
}
|
|
```
|
|
|
|
Next, define this property in the application configuration to provide the bean name for the `recordMetadataChannel`.
|
|
|
|
```
|
|
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
|
|
```
|
|
|
|
At this point, successful sent information will be sent to the `fooRecordChannel`.
|
|
|
|
You can write an `IntegrationFlow` as below to see the information.
|
|
|
|
```
|
|
@Bean
|
|
public IntegrationFlow integrationFlow() {
|
|
return f -> f.channel("fooRecordChannel")
|
|
.handle((payload, messageHeaders) -> payload);
|
|
}
|
|
```
|
|
|
|
In the `handle` method, the payload is what got sent to Kafka and the message headers contain a special key called `kafka_recordMetadata`.
|
|
Its value is a `RecordMetadata` that contains information about topic partition, current offset etc.
|
|
|
|
=== Adding custom header mapper in Kafka
|
|
|
|
==== Problem Statement
|
|
|
|
I have a Kafka producer application that sets some headers, but they are missing in the consumer application. Why is that?
|
|
|
|
==== Solution
|
|
|
|
Under normal circumstances, this should be fine.
|
|
|
|
Imagine, you have the following producer.
|
|
|
|
```
|
|
@Bean
|
|
public Supplier<Message<String>> supply() {
|
|
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
|
|
}
|
|
```
|
|
|
|
On the consumer side, you should still see the header "foo", and the following should not give you any issues.
|
|
|
|
```
|
|
@Bean
|
|
public Consumer<Message<String>> consume() {
|
|
return s -> {
|
|
final String foo = (String)s.getHeaders().get("foo");
|
|
System.out.println(foo);
|
|
};
|
|
}
|
|
```
|
|
|
|
If you provide a https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties[custom header mapper] in the application, then this won't work.
|
|
Let's say you have an empty `KafkaHeaderMapper` in the application.
|
|
|
|
```
|
|
@Bean
|
|
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
|
|
return new KafkaHeaderMapper() {
|
|
@Override
|
|
public void fromHeaders(MessageHeaders headers, Headers target) {
|
|
|
|
}
|
|
|
|
@Override
|
|
public void toHeaders(Headers source, Map<String, Object> target) {
|
|
|
|
}
|
|
};
|
|
}
|
|
```
|
|
|
|
If that is your implementation, then you will miss the `foo` header on the consumer.
|
|
Chances are that, you may have some logic inside those `KafkaHeaderMapper` methods.
|
|
You need the following to populate the `foo` header.
|
|
|
|
```
|
|
@Bean
|
|
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
|
|
return new KafkaHeaderMapper() {
|
|
@Override
|
|
public void fromHeaders(MessageHeaders headers, Headers target) {
|
|
final String foo = (String) headers.get("foo");
|
|
target.add("foo", foo.getBytes());
|
|
}
|
|
|
|
@Override
|
|
public void toHeaders(Headers source, Map<String, Object> target) {
|
|
final Header foo = source.lastHeader("foo");
|
|
target.put("foo", new String(foo.value()));
|
|
}
|
|
}
|
|
```
|
|
|
|
That will properly populate the `foo` header from the producer to consumer.
|
|
|
|
==== Special note on the id header
|
|
|
|
In Spring Cloud Stream, the `id` header is a special header, but some applications may want to have special custom id headers - something like `custom-id` or `ID` or `Id`.
|
|
The first one (`custom-id`) will propagate without any custom header mapper from producer to consumer.
|
|
However, if you produce with a variant of the framework reserved `id` header - such as `ID`, `Id`, `iD` etc. then you will run into issues with the internals of the framework.
|
|
See this https://stackoverflow.com/questions/68412600/change-the-behaviour-in-spring-cloud-stream-make-header-matcher-case-sensitive[StackOverflow thread] fore more context on this use case.
|
|
In that case, you must use a custom `KafkaHeaderMapper` to map the case-sensitive id header.
|
|
For example, let's say you have the following producer.
|
|
|
|
```
|
|
@Bean
|
|
public Supplier<Message<String>> supply() {
|
|
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
|
|
}
|
|
```
|
|
|
|
The header `Id` above will be gone from the consuming side as it clashes with the framework `id` header.
|
|
You can provide a custom `KafkaHeaderMapper` to solve this issue.
|
|
|
|
```
|
|
@Bean
|
|
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
|
|
return new KafkaHeaderMapper() {
|
|
@Override
|
|
public void fromHeaders(MessageHeaders headers, Headers target) {
|
|
final String myId = (String) headers.get("Id");
|
|
target.add("Id", myId.getBytes());
|
|
}
|
|
|
|
@Override
|
|
public void toHeaders(Headers source, Map<String, Object> target) {
|
|
final Header Id = source.lastHeader("Id");
|
|
target.put("Id", new String(Id.value()));
|
|
}
|
|
};
|
|
}
|
|
```
|
|
|
|
By doing this, both `id` and `Id` headers will be available from the producer to the consumer side.
|
|
|
|
=== Producing to multiple topics in transaction
|
|
|
|
==== Problem Statement
|
|
|
|
How do I produce transactional messages to multiple Kafka topics?
|
|
|
|
For more context, see this https://stackoverflow.com/questions/68928091/dlq-bounded-retry-and-eos-when-producing-to-multiple-topics-using-spring-cloud[StackOverflow question].
|
|
|
|
==== Solution
|
|
|
|
Use transactional support in Kafka binder for transactions and then provide an `AfterRollbackProcessor`.
|
|
In order to produce to multiple topics, use `StreamBridge` API.
|
|
|
|
Below are the code snippets for this:
|
|
|
|
```
|
|
@Autowired
|
|
StreamBridge bridge;
|
|
|
|
@Bean
|
|
Consumer<String> input() {
|
|
return str -> {
|
|
System.out.println(str);
|
|
this.bridge.send("left", str.toUpperCase());
|
|
this.bridge.send("right", str.toLowerCase());
|
|
if (str.equals("Fail")) {
|
|
throw new RuntimeException("test");
|
|
}
|
|
};
|
|
}
|
|
|
|
@Bean
|
|
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
|
|
return (container, dest, group) -> {
|
|
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
|
|
MessageChannel.class)).getTransactionalProducerFactory();
|
|
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
|
|
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
|
|
container.setAfterRollbackProcessor(rollbackProcessor);
|
|
};
|
|
}
|
|
|
|
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
|
|
return new DefaultAfterRollbackProcessor<>(
|
|
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
|
|
}
|
|
|
|
```
|
|
|
|
==== Required Configuration
|
|
|
|
```
|
|
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
|
|
spring.cloud.stream.kafka.binder.required-acks=all
|
|
spring.cloud.stream.bindings.input-in-0.group=foo
|
|
spring.cloud.stream.bindings.input-in-0.destination=input
|
|
spring.cloud.stream.bindings.left.destination=left
|
|
spring.cloud.stream.bindings.right.destination=right
|
|
|
|
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
|
|
```
|
|
|
|
in order to test, you can use the following:
|
|
|
|
```
|
|
@Bean
|
|
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
|
|
return args -> {
|
|
System.in.read();
|
|
template.send("input", "Fail".getBytes());
|
|
template.send("input", "Good".getBytes());
|
|
};
|
|
}
|
|
```
|
|
|
|
Some important notes:
|
|
|
|
Please ensure that you don't have any DLQ settings on the application configuration as we manually configure DLT (By default it will be published to a topic named `input.DLT` based on the initial consumer function).
|
|
Also, reset the `maxAttempts` on consumer binding to `1` in order to avoid retries by the binder.
|
|
It will be max tried a total of 3 in the example above (initial try + the 2 attempts in the `FixedBackoff`).
|
|
|
|
See the https://stackoverflow.com/questions/68928091/dlq-bounded-retry-and-eos-when-producing-to-multiple-topics-using-spring-cloud[StackOverflow thread] for more details on how to test this code.
|
|
If you are using Spring Cloud Stream to test it by adding more consumer functions, make sure to set the `isolation-level` on the consumer binding to `read-committed`.
|
|
|
|
This https://stackoverflow.com/questions/68941306/spring-cloud-stream-database-transaction-does-not-roll-back[StackOverflow thread] is also related to this discussion.
|
|
|
|
=== Pitfalls to avoid when running multiple pollable consumers
|
|
|
|
==== Problem Statement
|
|
|
|
How can I run multiple instances of the pollable consumers and generate unique `client.id` for each instance?
|
|
|
|
==== Solution
|
|
|
|
Assuming that I have the following definition:
|
|
|
|
```
|
|
spring.cloud.stream.pollable-source: foo
|
|
spring.cloud.stream.bindings.foo-in-0.group: my-group
|
|
```
|
|
|
|
When running the application, the Kafka consumer generates a client.id (something like `consumer-my-group-1`).
|
|
For each instance of the application that is running, this `client.id` will be the same, causing unexpected issues.
|
|
|
|
In order to fix this, you can add the following property on each instance of the application:
|
|
|
|
```
|
|
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
|
|
```
|
|
|
|
See this https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1139[GitHub issue] for more details.
|
|
|