Offset commit when DLQ is enabled and manual ack (#871)

* Offset commit when DLQ is enabled and manual ack

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/870

When an error occurs, if the application uses manual acknowldegment
(i.e. autoCommitOffset is false) and DLQ is enabled, then after
publishing to DLQ, the offset is not committed currently.
Addressing this issue by manually commiting after publishing to DLQ.

* Address PR review comments

* Addressing PR review comments - #2
This commit is contained in:
Soby Chacko
2020-03-24 16:28:39 -04:00
parent a5b1d30c1c
commit b329ad133a
2 changed files with 108 additions and 4 deletions

View File

@@ -103,6 +103,7 @@ import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.ProducerListener;
@@ -112,6 +113,7 @@ import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
@@ -214,6 +216,8 @@ public class KafkaMessageChannelBinder extends
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
private Map<ConsumerDestination, ContainerProperties.AckMode> ackModeInfo = new ConcurrentHashMap<>();
public KafkaMessageChannelBinder(
KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
@@ -681,6 +685,7 @@ public class KafkaMessageChannelBinder extends
kafkaMessageDrivenChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
}
this.getContainerCustomizer().configure(messageListenerContainer, destination.getName(), group);
this.ackModeInfo.put(destination, messageListenerContainer.getContainerProperties().getAckMode());
return kafkaMessageDrivenChannelAdapter;
}
@@ -1156,16 +1161,31 @@ public class KafkaMessageChannelBinder extends
String dlqName = StringUtils.hasText(kafkaConsumerProperties.getDlqName())
? kafkaConsumerProperties.getDlqName()
: "error." + record.topic() + "." + group;
MessageHeaders headers;
if (message instanceof ErrorMessage) {
final ErrorMessage errorMessage = (ErrorMessage) message;
final Message<?> originalMessage = errorMessage.getOriginalMessage();
if (originalMessage != null) {
headers = originalMessage.getHeaders();
}
else {
headers = message.getHeaders();
}
}
else {
headers = message.getHeaders();
}
if (this.transactionTemplate != null) {
Throwable throwable2 = throwable;
this.transactionTemplate.executeWithoutResult(status -> {
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName, group, throwable2,
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()));
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()),
headers, this.ackModeInfo.get(destination));
});
}
else {
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName, group, throwable,
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()));
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()), headers, this.ackModeInfo.get(destination));
}
};
}
@@ -1428,7 +1448,8 @@ public class KafkaMessageChannelBinder extends
@SuppressWarnings("unchecked")
void sendToDlq(ConsumerRecord<?, ?> consumerRecord, Headers headers,
String dlqName, String group, Throwable throwable, DlqPartitionFunction partitionFunction) {
String dlqName, String group, Throwable throwable, DlqPartitionFunction partitionFunction,
MessageHeaders messageHeaders, ContainerProperties.AckMode ackMode) {
K key = (K) consumerRecord.key();
V value = (V) consumerRecord.value();
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(dlqName,
@@ -1458,6 +1479,9 @@ public class KafkaMessageChannelBinder extends
KafkaMessageChannelBinder.this.logger
.debug("Sent to DLQ " + sb.toString());
}
if (ackMode == ContainerProperties.AckMode.MANUAL || ackMode == ContainerProperties.AckMode.MANUAL_IMMEDIATE) {
messageHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge();
}
}
});
}

View File

@@ -1190,7 +1190,6 @@ public class KafkaBinderTests extends
consumerProperties.setBackOffInitialInterval(100);
consumerProperties.setBackOffMaxInterval(150);
consumerProperties.getExtension().setEnableDlq(true);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
DirectChannel moduleInputChannel = createBindableChannel("input",
createConsumerBindingProperties(consumerProperties));
@@ -1252,6 +1251,87 @@ public class KafkaBinderTests extends
producerBinding.unbind();
}
//See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/870 for motivation for this test.
@Test
@SuppressWarnings("unchecked")
public void testAutoCommitOnErrorWhenManualAcknowledgement() throws Exception {
Binder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
BindingProperties producerBindingProperties = createProducerBindingProperties(
producerProperties);
DirectChannel moduleOutputChannel = createBindableChannel("output",
producerBindingProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setMaxAttempts(3);
consumerProperties.setBackOffInitialInterval(100);
consumerProperties.setBackOffMaxInterval(150);
//When auto commit is disabled, then the record is committed after publishing to DLQ using the manual acknowledgement.
// (if DLQ is enabled, which is, in this case).
consumerProperties.getExtension().setAutoCommitOffset(false);
consumerProperties.getExtension().setEnableDlq(true);
DirectChannel moduleInputChannel = createBindableChannel("input",
createConsumerBindingProperties(consumerProperties));
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
moduleInputChannel.subscribe(handler);
long uniqueBindingId = System.currentTimeMillis();
Binding<MessageChannel> producerBinding = binder.bindProducer(
"retryTest." + uniqueBindingId + ".0", moduleOutputChannel,
producerProperties);
Binding<MessageChannel> consumerBinding = binder.bindConsumer(
"retryTest." + uniqueBindingId + ".0", "testGroup", moduleInputChannel,
consumerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
dlqConsumerProperties.setMaxAttempts(1);
QueueChannel dlqChannel = new QueueChannel();
Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
"error.retryTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel,
dlqConsumerProperties);
String testMessagePayload = "test." + UUID.randomUUID().toString();
Message<byte[]> testMessage = MessageBuilder
.withPayload(testMessagePayload.getBytes()).build();
moduleOutputChannel.send(testMessage);
Message<?> dlqMessage = receive(dlqChannel, 3);
assertThat(dlqMessage).isNotNull();
assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
// first attempt fails
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
Message<?> handledMessage = handler.getReceivedMessages().entrySet().iterator()
.next().getValue();
assertThat(handledMessage).isNotNull();
assertThat(
new String((byte[]) handledMessage.getPayload(), StandardCharsets.UTF_8))
.isEqualTo(testMessagePayload);
assertThat(handler.getInvocationCount())
.isEqualTo(consumerProperties.getMaxAttempts());
binderBindUnbindLatency();
dlqConsumerBinding.unbind();
consumerBinding.unbind();
// on the second attempt the message is not redelivered because the DLQ is set and the record in error is already committed.
QueueChannel successfulInputChannel = new QueueChannel();
consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0",
"testGroup", successfulInputChannel, consumerProperties);
String testMessage2Payload = "test1." + UUID.randomUUID().toString();
Message<byte[]> testMessage2 = MessageBuilder
.withPayload(testMessage2Payload.getBytes()).build();
moduleOutputChannel.send(testMessage2);
Message<?> receivedMessage = receive(successfulInputChannel);
assertThat(receivedMessage.getPayload())
.isEqualTo(testMessage2Payload.getBytes());
binderBindUnbindLatency();
consumerBinding.unbind();
producerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testConfigurableDlqName() throws Exception {