diff --git a/recipes/recipe-13-producing-to-multiple-topics-in-transaction.adoc b/recipes/recipe-13-producing-to-multiple-topics-in-transaction.adoc new file mode 100644 index 0000000..a5fb4bb --- /dev/null +++ b/recipes/recipe-13-producing-to-multiple-topics-in-transaction.adoc @@ -0,0 +1,83 @@ +# Producing to multiple topics in transaction + +## Problem Statement + +How do I produce transactional messages to multiple Kafka topics? + +For more context, see this SO thread: https://stackoverflow.com/questions/68928091/dlq-bounded-retry-and-eos-when-producing-to-multiple-topics-using-spring-cloud + +## Solution + +Use transactional support in Kafka binder for transactions and then provide an `AfterRollbackProcessor`. +In order to produce to multiple topics, use `StreamBridge` API. + +Below are the code snippets for this: + +``` +@Autowired +StreamBridge bridge; + +@Bean +Consumer input() { + return str -> { + System.out.println(str); + this.bridge.send("left", str.toUpperCase()); + this.bridge.send("right", str.toLowerCase()); + if (str.equals("Fail")) { + throw new RuntimeException("test"); + } + }; +} + +@Bean +ListenerContainerCustomizer> customizer(BinderFactory binders) { + return (container, dest, group) -> { + ProducerFactory pf = ((KafkaMessageChannelBinder) binders.getBinder(null, + MessageChannel.class)).getTransactionalProducerFactory(); + KafkaTemplate template = new KafkaTemplate<>(pf); + DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template); + container.setAfterRollbackProcessor(rollbackProcessor); + }; +} + +DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate template) { + return new DefaultAfterRollbackProcessor<>( + new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true); +} + +``` + +### Required Configuration + +``` +spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx- +spring.cloud.stream.kafka.binder.required-acks=all +spring.cloud.stream.bindings.input-in-0.group=foo +spring.cloud.stream.bindings.input-in-0.destination=input +spring.cloud.stream.bindings.left.destination=left +spring.cloud.stream.bindings.right.destination=right + +spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1 +``` + +in order to test, you can use the following: + +``` +@Bean +public ApplicationRunner runner(KafkaTemplate template) { + return args -> { + System.in.read(); + template.send("input", "Fail".getBytes()); + template.send("input", "Good".getBytes()); + }; +} +``` + +Some important notes: + +Please ensure that you don't have any DLQ settings on the application configuration as we manually configure DLT (By default it will be published to a topic named `input.DLT` based on the initial consumer function). +Also, reset the `maxAttempts` on consumer binding to `1` in order to avoid retries by the binder. +It will be max tried a total of 3 in the example above (initial try + the 2 attempts in the `FixedBackoff`). + +See the https://stackoverflow.com/questions/68928091/dlq-bounded-retry-and-eos-when-producing-to-multiple-topics-using-spring-cloud[SO thread] for more details on how to test this code. +If you are using Spring Cloud Stream to test it by adding more consumer functions, make sure to set the `isolation-level` on the consumer binding to `read-committed`. \ No newline at end of file