GH-715: Add retry, dlq for transactional binders

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/715

When using transactions, binder retry, dlq cannot be used because the retry
runs within the transaction, which is undesirable if there is another resource
involved; also publishing to the DLQ could be rolled back.

Use the retry properties to configure an `AfterRollbackProcessor` to perform the
retry and DLQ publishing after the transaction has rolled back.

Add docs; move the container customizer invocation to the end.
This commit is contained in:
Gary Russell
2019-11-12 13:26:30 -05:00
committed by Soby Chacko
parent 34b0945d43
commit 88912b8d6b
4 changed files with 145 additions and 30 deletions

View File

@@ -561,6 +561,10 @@ When used in a processor application, the consumer starts the transaction; any r
When the listener exits normally, the listener container will send the offset to the transaction and commit it.
A common producer factory is used for all producer bindings configured using `spring.cloud.stream.kafka.binder.transaction.producer.*` properties; individual binding Kafka producer properties are ignored.
IMPORTANT: Normal binder retries (and dead lettering) are not supported with transactions because the retries will run in the original transaction, which may be rolled back and any published records will be rolled back too.
When retries are enabled (the common property `maxAttempts` is greater than zero) the retry properties are used to configure a `DefaultAfterRollbackProcessor` to enable retries at the container level.
Similarly, instead of publishing dead-letter records within the transaction, this functionality is moved to the listener container, again via the `DefaultAfterRollbackProcessor` which runs after the main transaction has rolled back.
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. `@Scheduled` method), you must get a reference to the transactional producer factory and define a `KafkaTransactionManager` bean using it.
====

View File

@@ -80,6 +80,7 @@ import org.springframework.context.Lifecycle;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.channel.AbstractMessageChannel;
@@ -101,6 +102,7 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.ProducerListener;
@@ -117,10 +119,14 @@ import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.InterceptableChannel;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.ExponentialBackOff;
import org.springframework.util.backoff.FixedBackOff;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@@ -197,6 +203,8 @@ public class KafkaMessageChannelBinder extends
private final KafkaTransactionManager<byte[], byte[]> transactionManager;
private final TransactionTemplate transactionTemplate;
private final KafkaBindingRebalanceListener rebalanceListener;
private final DlqPartitionFunction dlqPartitionFunction;
@@ -238,9 +246,11 @@ public class KafkaMessageChannelBinder extends
configurationProperties.getTransaction().getTransactionIdPrefix(),
new ExtendedProducerProperties<>(configurationProperties
.getTransaction().getProducer().getExtension())));
this.transactionTemplate = new TransactionTemplate(this.transactionManager);
}
else {
this.transactionManager = null;
this.transactionTemplate = null;
}
this.rebalanceListener = rebalanceListener;
this.dlqPartitionFunction = dlqPartitionFunction != null
@@ -513,6 +523,7 @@ public class KafkaMessageChannelBinder extends
protected MessageProducer createConsumerEndpoint(
final ConsumerDestination destination, final String group,
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(
!anonymous || !extendedConsumerProperties.getExtension().isEnableDlq(),
@@ -622,31 +633,89 @@ public class KafkaMessageChannelBinder extends
this.logger.debug("Listened partitions: "
+ StringUtils.collectionToCommaDelimitedString(listenedPartitions));
}
this.getContainerCustomizer().configure(messageListenerContainer,
destination.getName(), group);
// @checkstyle:off
final KafkaMessageDrivenChannelAdapter<?, ?> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(messageListenerContainer,
extendedConsumerProperties.isBatchMode() ? ListenerMode.batch : ListenerMode.record);
// @checkstyle:on
kafkaMessageDrivenChannelAdapter
.setMessageConverter(getMessageConverter(extendedConsumerProperties));
MessagingMessageConverter messageConverter = getMessageConverter(extendedConsumerProperties);
kafkaMessageDrivenChannelAdapter.setMessageConverter(messageConverter);
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination,
consumerGroup, extendedConsumerProperties);
if (!extendedConsumerProperties.isBatchMode() && extendedConsumerProperties.getMaxAttempts() > 1) {
if (!extendedConsumerProperties.isBatchMode()
&& extendedConsumerProperties.getMaxAttempts() > 1
&& this.transactionManager == null) {
kafkaMessageDrivenChannelAdapter
.setRetryTemplate(buildRetryTemplate(extendedConsumerProperties));
kafkaMessageDrivenChannelAdapter
.setRecoveryCallback(errorInfrastructure.getRecoverer());
}
else {
kafkaMessageDrivenChannelAdapter
.setErrorChannel(errorInfrastructure.getErrorChannel());
else if (!extendedConsumerProperties.isBatchMode() && this.transactionManager != null) {
messageListenerContainer.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(
(record, exception) -> {
MessagingException payload =
new MessagingException(messageConverter.toMessage(record, null, null, null),
"Transaction rollback limit exceeded", exception);
try {
errorInfrastructure.getErrorChannel()
.send(new ErrorMessage(
payload,
Collections.singletonMap(IntegrationMessageHeaderAccessor.SOURCE_DATA,
record)));
}
catch (Exception e) {
/*
* When there is no DLQ, the FinalRethrowingErrorMessageHandler will re-throw
* the payload; that will subvert the recovery and cause a re-seek of the failed
* record, so we ignore that here.
*/
if (!e.equals(payload)) {
throw e;
}
}
}, createBackOff(extendedConsumerProperties)));
}
else {
kafkaMessageDrivenChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
}
this.getContainerCustomizer().configure(messageListenerContainer, destination.getName(), group);
return kafkaMessageDrivenChannelAdapter;
}
/**
* Configure a {@link BackOff} for the after rollback processor, based on the consumer
* retry properties. If retry is disabled, return a {@link BackOff} that disables
* retry. Otherwise calculate the {@link ExponentialBackOff#setMaxElapsedTime(long)}
* so that the {@link BackOff} stops after the configured
* {@link ExtendedConsumerProperties#getMaxAttempts()}.
* @param extendedConsumerProperties the properties.
* @return the backoff.
*/
private BackOff createBackOff(
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
int maxAttempts = extendedConsumerProperties.getMaxAttempts();
if (maxAttempts < 2) {
return new FixedBackOff(0L, 0L);
}
int initialInterval = extendedConsumerProperties.getBackOffInitialInterval();
double multiplier = extendedConsumerProperties.getBackOffMultiplier();
int maxInterval = extendedConsumerProperties.getBackOffMaxInterval();
ExponentialBackOff backOff = new ExponentialBackOff(initialInterval, multiplier);
backOff.setMaxInterval(maxInterval);
long maxElapsed = extendedConsumerProperties.getBackOffInitialInterval();
double accum = maxElapsed;
for (int i = 1; i < maxAttempts - 1; i++) {
accum = accum * multiplier;
if (accum > maxInterval) {
accum = maxInterval;
}
maxElapsed += accum;
}
backOff.setMaxElapsedTime(maxElapsed);
return backOff;
}
public void setupRebalanceListener(
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
final ContainerProperties containerProperties) {
@@ -1039,8 +1108,11 @@ public class KafkaMessageChannelBinder extends
.getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_FQCN, throwable
.getClass().getName().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_MESSAGE,
throwable.getMessage().getBytes(StandardCharsets.UTF_8)));
String exceptionMessage = throwable.getMessage();
if (exceptionMessage != null) {
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_MESSAGE,
exceptionMessage.getBytes(StandardCharsets.UTF_8)));
}
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_STACKTRACE,
getStackTraceAsString(throwable)
.getBytes(StandardCharsets.UTF_8)));
@@ -1082,8 +1154,17 @@ public class KafkaMessageChannelBinder extends
String dlqName = StringUtils.hasText(kafkaConsumerProperties.getDlqName())
? kafkaConsumerProperties.getDlqName()
: "error." + record.topic() + "." + group;
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName, group, throwable,
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()));
if (this.transactionTemplate != null) {
Throwable throwable2 = throwable;
this.transactionTemplate.executeWithoutResult(status -> {
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName, group, throwable2,
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()));
});
}
else {
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName, group, throwable,
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()));
}
};
}
return null;

View File

@@ -167,7 +167,9 @@ public class KafkaBinderTests extends
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10,
"error.pollableDlq.group-pcWithDlq");
"error.pollableDlq.group-pcWithDlq")
.brokerProperty("transaction.state.log.replication.factor", "1")
.brokerProperty("transaction.state.log.min.isr", "1");
private KafkaTestBinder binder;
@@ -865,11 +867,21 @@ public class KafkaBinderTests extends
testDlqGuts(true, null, null);
}
@Test
public void testDlqAndRetryTransactional() throws Exception {
testDlqGuts(true, null, null, true);
}
@Test
public void testDlq() throws Exception {
testDlqGuts(false, null, 3);
}
@Test
public void testDlqTransactional() throws Exception {
testDlqGuts(false, null, 3, true);
}
@Test
public void testDlqNone() throws Exception {
testDlqGuts(false, HeaderMode.none, 1);
@@ -881,8 +893,19 @@ public class KafkaBinderTests extends
}
private void testDlqGuts(boolean withRetry, HeaderMode headerMode, Integer dlqPartitions) throws Exception {
testDlqGuts(withRetry, headerMode, dlqPartitions, false);
}
private void testDlqGuts(boolean withRetry, HeaderMode headerMode, Integer dlqPartitions,
boolean transactional) throws Exception {
int expectedDlqPartition = dlqPartitions == null ? 0 : dlqPartitions - 1;
KafkaBinderConfigurationProperties binderConfig = createConfigurationProperties();
if (transactional) {
binderConfig.getTransaction().setTransactionIdPrefix("tx-");
binderConfig.getTransaction().getProducer().getConfiguration().put(ProducerConfig.RETRIES_CONFIG, "1");
binderConfig.setRequiredAcks("all");
}
DlqPartitionFunction dlqPartitionFunction;
if (Integer.valueOf(1).equals(dlqPartitions)) {
dlqPartitionFunction = null; // test that ZERO_PARTITION is used
@@ -960,11 +983,13 @@ public class KafkaBinderTests extends
final AtomicReference<Message<?>> boundErrorChannelMessage = new AtomicReference<>();
final AtomicReference<Message<?>> globalErrorChannelMessage = new AtomicReference<>();
final AtomicBoolean hasRecovererInCallStack = new AtomicBoolean(!withRetry);
final AtomicBoolean hasAfterRollbackProcessorInStack = new AtomicBoolean(!withRetry);
boundErrorChannel.subscribe(message -> {
boundErrorChannelMessage.set(message);
String stackTrace = Arrays.toString(new RuntimeException().getStackTrace());
hasRecovererInCallStack
.set(stackTrace.contains("ErrorMessageSendingRecoverer"));
hasAfterRollbackProcessorInStack.set(stackTrace.contains("DefaultAfterRollbackProcessor"));
});
globalErrorChannel.subscribe(globalErrorChannelMessage::set);
@@ -1037,10 +1062,21 @@ public class KafkaBinderTests extends
.get(KafkaMessageChannelBinder.X_ORIGINAL_TIMESTAMP_TYPE))
.isEqualTo(TimestampType.CREATE_TIME.toString().getBytes());
assertThat(new String((byte[]) receivedMessage.getHeaders()
.get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE))).startsWith(
"Dispatcher failed to deliver Message; nested exception "
+ "is java.lang.RuntimeException: fail");
if (transactional) {
assertThat(new String((byte[]) receivedMessage.getHeaders()
.get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE))).startsWith(
"Transaction rollback limit exceeded");
assertThat(new String((byte[]) receivedMessage.getHeaders()
.get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE))).contains(
"Dispatcher failed to deliver Message; nested exception "
+ "is java.lang.RuntimeException: fail");
}
else {
assertThat(new String((byte[]) receivedMessage.getHeaders()
.get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE))).startsWith(
"Dispatcher failed to deliver Message; nested exception "
+ "is java.lang.RuntimeException: fail");
}
assertThat(receivedMessage.getHeaders()
.get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE)).isNotNull();
@@ -1061,7 +1097,8 @@ public class KafkaBinderTests extends
// bridge)
assertThat(boundErrorChannelMessage.get()).isNotNull();
assertThat(globalErrorChannelMessage.get()).isNotNull();
assertThat(hasRecovererInCallStack.get()).isEqualTo(withRetry);
assertThat(hasRecovererInCallStack.get()).isEqualTo(withRetry && !transactional);
assertThat(hasAfterRollbackProcessorInStack.get()).isEqualTo(transactional);
dlqConsumerBinding.unbind();
consumerBinding.unbind();

View File

@@ -17,8 +17,6 @@
package org.springframework.cloud.stream.binder.kafka;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
@@ -57,15 +55,10 @@ import static org.mockito.Mockito.spy;
*/
public class KafkaTransactionTests {
private static Map<String, String> brokerProperties = new HashMap<>();
static {
brokerProperties.put("transaction.state.log.replication.factor", "1");
brokerProperties.put("transaction.state.log.min.isr", "1");
}
@ClassRule
public static final EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1).brokerProperties(brokerProperties);
public static final EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1)
.brokerProperty("transaction.state.log.replication.factor", "1")
.brokerProperty("transaction.state.log.min.isr", "1");
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test