From 35c32db6617d69415ca8d937898526cc03e50f3d Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Mon, 12 Jul 2021 16:12:09 -0400 Subject: [PATCH] New recipe for how to keep track of successful publishing to Kafka. --- ...1-keep-track-of-successful-send-kafka.adoc | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 recipes/recipe-11-keep-track-of-successful-send-kafka.adoc diff --git a/recipes/recipe-11-keep-track-of-successful-send-kafka.adoc b/recipes/recipe-11-keep-track-of-successful-send-kafka.adoc new file mode 100644 index 0000000..30261f6 --- /dev/null +++ b/recipes/recipe-11-keep-track-of-successful-send-kafka.adoc @@ -0,0 +1,48 @@ +# Keeping track of successful sending of records (producing) in Kafka + +## Problem Statement + +I have a Kafka producer application and I want to keep track of all my successful sedings. + +## Solution + +Let us assume that we have this following supplier in the application. + +``` +@Bean + public Supplier> supplier() { + return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build(); + } +``` + +Then, we need to define a new `MessageChannel` bean to capture all the successful send information. + +``` +@Bean + public MessageChannel fooRecordChannel() { + return new DirectChannel(); + } +``` + +Next, define this property in the application configuration to provide the bean name for the `recordMetadataChannel`. + +``` +spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel +``` + +At this point, successful sent information will be sent to the `fooRecordChannel`. + +You can write an `IntegrationFlow` as below to see the information. + +``` +@Bean +public IntegrationFlow integrationFlow() { + return f -> f.channel("fooRecordChannel") + .handle((payload, messageHeaders) -> payload); +} +``` + +In the `handle` method, the payload is what got sent to Kafka and the message headers contain a special key called `kafka_recordMetadata`. +Its value is a `RecordMetadata` that contains information about topic partition, current offset etc. + +