New recipe for how to keep track of successful publishing to Kafka.
This commit is contained in:
48
recipes/recipe-11-keep-track-of-successful-send-kafka.adoc
Normal file
48
recipes/recipe-11-keep-track-of-successful-send-kafka.adoc
Normal file
@@ -0,0 +1,48 @@
|
||||
# Keeping track of successful sending of records (producing) in Kafka
|
||||
|
||||
## Problem Statement
|
||||
|
||||
I have a Kafka producer application and I want to keep track of all my successful sedings.
|
||||
|
||||
## Solution
|
||||
|
||||
Let us assume that we have this following supplier in the application.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Supplier<Message<String>> supplier() {
|
||||
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
|
||||
}
|
||||
```
|
||||
|
||||
Then, we need to define a new `MessageChannel` bean to capture all the successful send information.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public MessageChannel fooRecordChannel() {
|
||||
return new DirectChannel();
|
||||
}
|
||||
```
|
||||
|
||||
Next, define this property in the application configuration to provide the bean name for the `recordMetadataChannel`.
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
|
||||
```
|
||||
|
||||
At this point, successful sent information will be sent to the `fooRecordChannel`.
|
||||
|
||||
You can write an `IntegrationFlow` as below to see the information.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public IntegrationFlow integrationFlow() {
|
||||
return f -> f.channel("fooRecordChannel")
|
||||
.handle((payload, messageHeaders) -> payload);
|
||||
}
|
||||
```
|
||||
|
||||
In the `handle` method, the payload is what got sent to Kafka and the message headers contain a special key called `kafka_recordMetadata`.
|
||||
Its value is a `RecordMetadata` that contains information about topic partition, current offset etc.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user