GH-1140: CommonErrorHandler per consumer binding (#1143)
* GH-1140: CommonErrorHandler per consumer binding Setting CommonErrorHandler on consumer binding through its bean name. If present, binder will resolve this bean and assign it on the listener container. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1140 * Addressing PR review
This commit is contained in:
@@ -321,6 +321,12 @@ When using a transactional binder, the offset of a recovered record (e.g. when r
|
||||
Setting this property to `false` suppresses committing the offset of recovered record.
|
||||
+
|
||||
Default: true.
|
||||
commonErrorHandlerBeanName::
|
||||
`CommonErrorHandler` bean name to use per consumer binding.
|
||||
When present, this user provided `CommonErrorHandler` takes precedence over any other error handlers defined by the binder.
|
||||
This is a handy way to express error handlers, if the application does not want to use a `ListenerContainerCustomizer` and then check the destination/group combination to set an error handler.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
[[reset-offsets]]
|
||||
==== Resetting Offsets
|
||||
|
||||
@@ -210,6 +210,12 @@ public class KafkaConsumerProperties {
|
||||
*/
|
||||
private boolean txCommitRecovered = true;
|
||||
|
||||
/**
|
||||
* CommonErrorHandler bean name per consumer binding.
|
||||
* @since 3.2
|
||||
*/
|
||||
private String commonErrorHandlerBeanName;
|
||||
|
||||
/**
|
||||
* @return if each record needs to be acknowledged.
|
||||
*
|
||||
@@ -529,4 +535,11 @@ public class KafkaConsumerProperties {
|
||||
this.txCommitRecovered = txCommitRecovered;
|
||||
}
|
||||
|
||||
public String getCommonErrorHandlerBeanName() {
|
||||
return commonErrorHandlerBeanName;
|
||||
}
|
||||
|
||||
public void setCommonErrorHandlerBeanName(String commonErrorHandlerBeanName) {
|
||||
this.commonErrorHandlerBeanName = commonErrorHandlerBeanName;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,6 +102,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.CommonErrorHandler;
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
||||
import org.springframework.kafka.listener.ConsumerProperties;
|
||||
@@ -777,6 +778,12 @@ public class KafkaMessageChannelBinder extends
|
||||
else {
|
||||
kafkaMessageDrivenChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
|
||||
}
|
||||
final String commonErrorHandlerBeanName = extendedConsumerProperties.getExtension().getCommonErrorHandlerBeanName();
|
||||
if (StringUtils.hasText(commonErrorHandlerBeanName)) {
|
||||
final CommonErrorHandler commonErrorHandler = getApplicationContext().getBean(commonErrorHandlerBeanName,
|
||||
CommonErrorHandler.class);
|
||||
messageListenerContainer.setCommonErrorHandler(commonErrorHandler);
|
||||
}
|
||||
this.getContainerCustomizer().configure(messageListenerContainer, destination.getName(), group);
|
||||
this.ackModeInfo.put(destination, messageListenerContainer.getContainerProperties().getAckMode());
|
||||
return kafkaMessageDrivenChannelAdapter;
|
||||
|
||||
@@ -118,8 +118,11 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.CommonErrorHandler;
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.ContainerProperties;
|
||||
import org.springframework.kafka.listener.DefaultErrorHandler;
|
||||
import org.springframework.kafka.listener.MessageListenerContainer;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.support.KafkaHeaderMapper;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
@@ -145,6 +148,7 @@ import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
import org.springframework.util.backoff.FixedBackOff;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.SettableListenableFuture;
|
||||
|
||||
@@ -1343,6 +1347,68 @@ public class KafkaBinderTests extends
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCommonErrorHandlerBeanNameOnConsumerBinding() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
BindingProperties producerBindingProperties = createProducerBindingProperties(
|
||||
producerProperties);
|
||||
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output",
|
||||
producerBindingProperties);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
CommonErrorHandler commonErrorHandler = new DefaultErrorHandler(new FixedBackOff(0L, 0L)) {
|
||||
@Override
|
||||
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
|
||||
Consumer<?, ?> consumer, MessageListenerContainer container) {
|
||||
super.handleRemaining(thrownException, records, consumer, container);
|
||||
latch.countDown();
|
||||
}
|
||||
};
|
||||
|
||||
ConfigurableApplicationContext context = TestUtils.getPropertyValue(binder,
|
||||
"binder.applicationContext", ConfigurableApplicationContext.class);
|
||||
context.getBeanFactory().registerSingleton("fooCommonErrorHandler", commonErrorHandler);
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setMaxAttempts(2);
|
||||
consumerProperties.setBackOffInitialInterval(100);
|
||||
consumerProperties.setBackOffMaxInterval(150);
|
||||
consumerProperties.getExtension().setCommonErrorHandlerBeanName("fooCommonErrorHandler");
|
||||
|
||||
DirectChannel moduleInputChannel = createBindableChannel("input",
|
||||
createConsumerBindingProperties(consumerProperties));
|
||||
|
||||
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
|
||||
moduleInputChannel.subscribe(handler);
|
||||
long uniqueBindingId = System.currentTimeMillis();
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(
|
||||
"retryTest." + uniqueBindingId + ".0", moduleOutputChannel,
|
||||
producerProperties);
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer(
|
||||
"retryTest." + uniqueBindingId + ".0", "testGroup", moduleInputChannel,
|
||||
consumerProperties);
|
||||
|
||||
String testMessagePayload = "test." + UUID.randomUUID();
|
||||
Message<byte[]> testMessage = MessageBuilder
|
||||
.withPayload(testMessagePayload.getBytes()).build();
|
||||
moduleOutputChannel.send(testMessage);
|
||||
|
||||
Thread.sleep(3000);
|
||||
|
||||
//Assertions for the CommonErrorHandler configured on the consumer binding (commonErrorHandlerBeanName).
|
||||
assertThat(KafkaTestUtils.getPropertyValue(consumerBinding,
|
||||
"lifecycle.messageListenerContainer.commonErrorHandler")).isSameAs(commonErrorHandler);
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
|
||||
binderBindUnbindLatency();
|
||||
consumerBinding.unbind();
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
|
||||
//See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/870 for motivation for this test.
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
|
||||
Reference in New Issue
Block a user