GH-861: Add transaction manager bean override

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/861

Add docs
This commit is contained in:
Gary Russell
2020-03-16 15:42:27 -04:00
committed by Soby Chacko
parent 65f8cc5660
commit 07a740a5b5
5 changed files with 140 additions and 15 deletions

View File

@@ -281,6 +281,12 @@ pollTimeout::
Timeout used for polling in pollable consumers.
+
Default: 5 seconds.
transactionManager::
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
+
Default: none.
==== Consuming Batches
@@ -392,6 +398,12 @@ Supported values are `none`, `gzip`, `snappy` and `lz4`.
If you override the `kafka-clients` jar to 2.1.0 (or later), as discussed in the https://docs.spring.io/spring-kafka/docs/2.2.x/reference/html/deps-for-21x.html[Spring for Apache Kafka documentation], and wish to use `zstd` compression, use `spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd`.
+
Default: `none`.
transactionManager::
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
+
Default: none.
==== Usage examples

View File

@@ -150,6 +150,7 @@ public class KafkaConsumerProperties {
/**
* @deprecated No longer used by the binder.
*/
@Deprecated
private int recoveryInterval = 5000;
/**
@@ -194,6 +195,11 @@ public class KafkaConsumerProperties {
*/
private long pollTimeout = org.springframework.kafka.listener.ConsumerProperties.DEFAULT_POLL_TIMEOUT;
/**
* Transaction manager bean name - overrides the binder's transaction configuration.
*/
private String transactionManager;
/**
* @return if each record needs to be acknowledged.
*
@@ -462,4 +468,18 @@ public class KafkaConsumerProperties {
public void setPollTimeout(long pollTimeout) {
this.pollTimeout = pollTimeout;
}
/**
* @return the transaction manager bean name.
*
* Transaction manager bean name (must be {@code KafkaAwareTransactionManager}.
*/
public String getTransactionManager() {
return this.transactionManager;
}
public void setTransactionManager(String transactionManager) {
this.transactionManager = transactionManager;
}
}

View File

@@ -94,6 +94,11 @@ public class KafkaProducerProperties {
*/
private String recordMetadataChannel;
/**
* Transaction manager bean name - overrides the binder's transaction configuration.
*/
private String transactionManager;
/**
* @return buffer size
*
@@ -244,6 +249,19 @@ public class KafkaProducerProperties {
this.recordMetadataChannel = recordMetadataChannel;
}
/**
* @return the transaction manager bean name.
*
* Transaction manager bean name (must be {@code KafkaAwareTransactionManager}.
*/
public String getTransactionManager() {
return this.transactionManager;
}
public void setTransactionManager(String transactionManager) {
this.transactionManager = transactionManager;
}
/**
* Enumeration for compression types.
*/

View File

@@ -109,6 +109,7 @@ import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;
@@ -342,9 +343,12 @@ public class KafkaMessageChannelBinder extends
* (spring.cloud.stream.kafka.binder.transaction.producer.*) properties are used
* instead, for all producers. A binder is transactional when
* 'spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix' has text.
* Individual bindings can override the binder's transaction manager.
*/
final ProducerFactory<byte[], byte[]> producerFB = this.transactionManager != null
? this.transactionManager.getProducerFactory()
KafkaAwareTransactionManager<byte[], byte[]> transMan = transactionManager(
producerProperties.getExtension().getTransactionManager());
final ProducerFactory<byte[], byte[]> producerFB = transMan != null
? transMan.getProducerFactory()
: getProducerFactory(null, producerProperties);
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
producerProperties.getPartitionCount(), false, () -> {
@@ -352,7 +356,7 @@ public class KafkaMessageChannelBinder extends
List<PartitionInfo> partitionsFor = producer
.partitionsFor(destination.getName());
producer.close();
if (this.transactionManager == null) {
if (transMan == null) {
((DisposableBean) producerFB).destroy();
}
return partitionsFor;
@@ -383,7 +387,7 @@ public class KafkaMessageChannelBinder extends
if (this.producerListener != null) {
kafkaTemplate.setProducerListener(this.producerListener);
}
if (this.transactionManager != null) {
if (transMan != null) {
kafkaTemplate.setTransactionIdPrefix(configurationProperties.getTransaction().getTransactionIdPrefix());
}
ProducerConfigurationMessageHandler handler = new ProducerConfigurationMessageHandler(
@@ -520,7 +524,7 @@ public class KafkaMessageChannelBinder extends
@Override
protected boolean useNativeEncoding(
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
if (this.transactionManager != null) {
if (transactionManager(producerProperties.getExtension().getTransactionManager()) != null) {
return this.configurationProperties.getTransaction().getProducer()
.isUseNativeEncoding();
}
@@ -586,8 +590,10 @@ public class KafkaMessageChannelBinder extends
? new ContainerProperties(Pattern.compile(topics[0]))
: new ContainerProperties(topics)
: new ContainerProperties(topicPartitionOffsets);
if (this.transactionManager != null) {
containerProperties.setTransactionManager(this.transactionManager);
KafkaAwareTransactionManager<byte[], byte[]> transMan = transactionManager(
extendedConsumerProperties.getExtension().getTransactionManager());
if (transMan != null) {
containerProperties.setTransactionManager(transMan);
}
if (this.rebalanceListener != null) {
setupRebalanceListener(extendedConsumerProperties, containerProperties);
@@ -653,14 +659,14 @@ public class KafkaMessageChannelBinder extends
consumerGroup, extendedConsumerProperties);
if (!extendedConsumerProperties.isBatchMode()
&& extendedConsumerProperties.getMaxAttempts() > 1
&& this.transactionManager == null) {
&& transMan == null) {
kafkaMessageDrivenChannelAdapter
.setRetryTemplate(buildRetryTemplate(extendedConsumerProperties));
kafkaMessageDrivenChannelAdapter
.setRecoveryCallback(errorInfrastructure.getRecoverer());
}
else if (!extendedConsumerProperties.isBatchMode() && this.transactionManager != null) {
else if (!extendedConsumerProperties.isBatchMode() && transMan != null) {
messageListenerContainer.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(
(record, exception) -> {
MessagingException payload =
@@ -1050,8 +1056,10 @@ public class KafkaMessageChannelBinder extends
if (kafkaConsumerProperties.isEnableDlq()) {
KafkaProducerProperties dlqProducerProperties = kafkaConsumerProperties
.getDlqProducerProperties();
ProducerFactory<?, ?> producerFactory = this.transactionManager != null
? this.transactionManager.getProducerFactory()
KafkaAwareTransactionManager<byte[], byte[]> transMan = transactionManager(
properties.getExtension().getTransactionManager());
ProducerFactory<?, ?> producerFactory = transMan != null
? transMan.getProducerFactory()
: getProducerFactory(null,
new ExtendedProducerProperties<>(dlqProducerProperties));
final KafkaTemplate<?, ?> kafkaTemplate = new KafkaTemplate<>(
@@ -1066,7 +1074,7 @@ public class KafkaMessageChannelBinder extends
if (properties.isUseNativeDecoding()) {
if (record != null) {
Map<String, String> configuration = this.transactionManager == null
Map<String, String> configuration = transMan == null
? dlqProducerProperties.getConfiguration()
: this.configurationProperties.getTransaction()
.getProducer().getConfiguration();
@@ -1180,6 +1188,15 @@ public class KafkaMessageChannelBinder extends
return null;
}
@SuppressWarnings("unchecked")
@Nullable
private KafkaAwareTransactionManager<byte[], byte[]> transactionManager(@Nullable String beanName) {
if (StringUtils.hasText(beanName)) {
return getApplicationContext().getBean(beanName, KafkaAwareTransactionManager.class);
}
return this.transactionManager;
}
private DlqPartitionFunction determinDlqPartitionFunction(Integer dlqPartitions) {
if (this.dlqPartitionFunction != null) {
return this.dlqPartitionFunction;

View File

@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.integration2;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -33,22 +34,30 @@ import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.backoff.FixedBackOff;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
/**
* @author Gary Russell
@@ -63,6 +72,8 @@ import static org.assertj.core.api.Assertions.assertThat;
"spring.cloud.stream.bindings.input.destination=consumer.producer.txIn",
"spring.cloud.stream.bindings.input.group=consumer.producer.tx",
"spring.cloud.stream.bindings.input.consumer.max-attempts=1",
"spring.cloud.stream.kafka.bindings.input2.consumer.transaction-manager=tm",
"spring.cloud.stream.kafka.bindings.output2.producer.transaction-manager=tm",
"spring.cloud.stream.bindings.output.destination=consumer.producer.txOut",
"spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.",
"spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=99",
@@ -100,7 +111,17 @@ public class ConsumerProducerTransactionTests {
assertThat(this.config.outs).containsExactlyInAnyOrder("ONE", "THREE");
}
@EnableBinding(Processor.class)
@Test
public void externalTM() {
assertThat(this.config.input2Container.getContainerProperties().getTransactionManager())
.isSameAs(this.config.tm);
Object handler = KafkaTestUtils.getPropertyValue(this.config.output2, "dispatcher.handlers", Set.class)
.iterator().next();
assertThat(KafkaTestUtils.getPropertyValue(handler, "delegate.kafkaTemplate.producerFactory"))
.isSameAs(this.config.pf);
}
@EnableBinding(TwoProcessors.class)
@EnableAutoConfiguration
public static class Config {
@@ -111,6 +132,15 @@ public class ConsumerProducerTransactionTests {
@Autowired
private MessageChannel output;
@Autowired
MessageChannel output2;
AbstractMessageListenerContainer<?, ?> input2Container;
ProducerFactory pf;
KafkaAwareTransactionManager<byte[], byte[]> tm;
@KafkaListener(id = "test.cons.prod", topics = "consumer.producer.txOut")
public void listenOut(String in) {
this.outs.add(in);
@@ -125,6 +155,10 @@ public class ConsumerProducerTransactionTests {
}
}
@StreamListener("input2")
public void listenIn2(String in) {
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
@@ -136,10 +170,34 @@ public class ConsumerProducerTransactionTests {
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, dest, group) -> container
.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(0L, 1L)));
return (container, dest, group) -> {
container.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(0L, 1L)));
if ("input2".equals(dest)) {
this.input2Container = container;
}
};
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public KafkaAwareTransactionManager<byte[], byte[]> tm(ProducerFactory pf) {
KafkaAwareTransactionManager mock = mock(KafkaAwareTransactionManager.class);
this.pf = pf;
given(mock.getProducerFactory()).willReturn(pf);
this.tm = mock;
return mock;
}
}
public interface TwoProcessors extends Processor {
@Input
SubscribableChannel input2();
@Output
MessageChannel output2();
}
}