Recipe for transactions and multiple topics.
This commit is contained in:
@@ -0,0 +1,83 @@
|
||||
# Producing to multiple topics in transaction
|
||||
|
||||
## Problem Statement
|
||||
|
||||
How do I produce transactional messages to multiple Kafka topics?
|
||||
|
||||
For more context, see this SO thread: https://stackoverflow.com/questions/68928091/dlq-bounded-retry-and-eos-when-producing-to-multiple-topics-using-spring-cloud
|
||||
|
||||
## Solution
|
||||
|
||||
Use transactional support in Kafka binder for transactions and then provide an `AfterRollbackProcessor`.
|
||||
In order to produce to multiple topics, use `StreamBridge` API.
|
||||
|
||||
Below are the code snippets for this:
|
||||
|
||||
```
|
||||
@Autowired
|
||||
StreamBridge bridge;
|
||||
|
||||
@Bean
|
||||
Consumer<String> input() {
|
||||
return str -> {
|
||||
System.out.println(str);
|
||||
this.bridge.send("left", str.toUpperCase());
|
||||
this.bridge.send("right", str.toLowerCase());
|
||||
if (str.equals("Fail")) {
|
||||
throw new RuntimeException("test");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
|
||||
return (container, dest, group) -> {
|
||||
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
|
||||
MessageChannel.class)).getTransactionalProducerFactory();
|
||||
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
|
||||
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
|
||||
container.setAfterRollbackProcessor(rollbackProcessor);
|
||||
};
|
||||
}
|
||||
|
||||
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
|
||||
return new DefaultAfterRollbackProcessor<>(
|
||||
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
### Required Configuration
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
|
||||
spring.cloud.stream.kafka.binder.required-acks=all
|
||||
spring.cloud.stream.bindings.input-in-0.group=foo
|
||||
spring.cloud.stream.bindings.input-in-0.destination=input
|
||||
spring.cloud.stream.bindings.left.destination=left
|
||||
spring.cloud.stream.bindings.right.destination=right
|
||||
|
||||
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
|
||||
```
|
||||
|
||||
in order to test, you can use the following:
|
||||
|
||||
```
|
||||
@Bean
|
||||
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
|
||||
return args -> {
|
||||
System.in.read();
|
||||
template.send("input", "Fail".getBytes());
|
||||
template.send("input", "Good".getBytes());
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
Some important notes:
|
||||
|
||||
Please ensure that you don't have any DLQ settings on the application configuration as we manually configure DLT (By default it will be published to a topic named `input.DLT` based on the initial consumer function).
|
||||
Also, reset the `maxAttempts` on consumer binding to `1` in order to avoid retries by the binder.
|
||||
It will be max tried a total of 3 in the example above (initial try + the 2 attempts in the `FixedBackoff`).
|
||||
|
||||
See the https://stackoverflow.com/questions/68928091/dlq-bounded-retry-and-eos-when-producing-to-multiple-topics-using-spring-cloud[SO thread] for more details on how to test this code.
|
||||
If you are using Spring Cloud Stream to test it by adding more consumer functions, make sure to set the `isolation-level` on the consumer binding to `read-committed`.
|
||||
Reference in New Issue
Block a user