diff --git a/docs/src/main/asciidoc/overview.adoc b/docs/src/main/asciidoc/overview.adoc index b3dca239..367ae107 100644 --- a/docs/src/main/asciidoc/overview.adoc +++ b/docs/src/main/asciidoc/overview.adoc @@ -969,3 +969,28 @@ public AdminClientConfigCustomizer adminClientConfigCustomizer() { }; } ``` + +[[custom-kafka-binder-health-indicator]] +=== Custom Kafka Binder Health Indicator + +Kafka binder activates a default health indicator when Spring Boot actuator is on the classpath. +This health indicator checks the health of the binder and any communication issues with the Kafka broker. +If an application wants to disable this default health check implementation and include a custom implementation, then it can provide an implementation for `KafkaBinderHealth` interface. +`KafkaBinderHealth` is a marker interface that extends from `HealthIndicator`. +In the custom implementation, it must provide an implementation for the `health()` method. +The custom implementation must be present in the application configuration as a bean. +When the binder discovers the custom implementation, it will use that instead of the default implementation. +Here is an example of such a custom implementation bean in the application. + +``` +@Bean +public KafkaBinderHealth kafkaBinderHealthIndicator() { + return new KafkaBinderHealth() { + @Override + public Health health() { + // custom implementation details. + } + }; +} +``` + diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealth.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealth.java new file mode 100644 index 00000000..dd6e6b6b --- /dev/null +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealth.java @@ -0,0 +1,29 @@ +/* + * Copyright 2022-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka; + +import org.springframework.boot.actuate.health.HealthIndicator; + +/** + * Marker interface used for custom KafkaBinderHealth indicator implementations. + * + * @author Soby Chacko + * @since 3.2.2 + */ +public interface KafkaBinderHealth extends HealthIndicator { + +} diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.java index 594ca55c..e35d1529 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,7 +35,6 @@ import org.apache.kafka.common.PartitionInfo; import org.springframework.beans.factory.DisposableBean; import org.springframework.boot.actuate.health.Health; -import org.springframework.boot.actuate.health.HealthIndicator; import org.springframework.boot.actuate.health.Status; import org.springframework.boot.actuate.health.StatusAggregator; import org.springframework.kafka.core.ConsumerFactory; @@ -55,7 +54,7 @@ import org.springframework.scheduling.concurrent.CustomizableThreadFactory; * @author Chukwubuikem Ume-Ugwa * @author Taras Danylchuk */ -public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBean { +public class KafkaBinderHealthIndicator implements KafkaBinderHealth, DisposableBean { private static final int DEFAULT_TIMEOUT = 60; @@ -73,7 +72,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBe private boolean considerDownWhenAnyPartitionHasNoLeader; public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder, - ConsumerFactory consumerFactory) { + ConsumerFactory consumerFactory) { this.binder = binder; this.consumerFactory = consumerFactory; } @@ -219,7 +218,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBe } @Override - public void destroy() throws Exception { + public void destroy() { executor.shutdown(); } diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderHealthIndicatorConfiguration.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderHealthIndicatorConfiguration.java index dd2dc570..67d53a18 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderHealthIndicatorConfiguration.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderHealthIndicatorConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,8 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealth; import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator; import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder; import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; @@ -38,11 +40,13 @@ import org.springframework.util.ObjectUtils; * * @author Oleg Zhurakousky * @author Chukwubuikem Ume-Ugwa + * @author Soby Chacko */ -@Configuration +@Configuration(proxyBeanMethods = false) @ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator") @ConditionalOnEnabledHealthIndicator("binders") +@ConditionalOnMissingBean(KafkaBinderHealth.class) public class KafkaBinderHealthIndicatorConfiguration { @Bean diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration2/KafkaBinderCustomHealthCheckTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration2/KafkaBinderCustomHealthCheckTests.java new file mode 100644 index 00000000..49a6c170 --- /dev/null +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration2/KafkaBinderCustomHealthCheckTests.java @@ -0,0 +1,72 @@ +/* + * Copyright 2022-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka.integration2; + +import org.junit.ClassRule; +import org.junit.Test; + +import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealth; +import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.test.rule.EmbeddedKafkaRule; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +/** + * @author Soby Chacko + */ +public class KafkaBinderCustomHealthCheckTests { + + @ClassRule + public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10); + + @Test + public void testCustomHealthIndicatorIsActivated() { + ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder( + CustomHealthCheckApplication.class).web(WebApplicationType.NONE).run( + "--spring.cloud.stream.kafka.binder.brokers=" + + embeddedKafka.getEmbeddedKafka().getBrokersAsString()); + final KafkaBinderHealth kafkaBinderHealth = applicationContext.getBean(KafkaBinderHealth.class); + assertThat(kafkaBinderHealth).isInstanceOf(CustomHealthIndicator.class); + assertThatThrownBy(() -> applicationContext.getBean(KafkaBinderHealthIndicator.class)).isInstanceOf(NoSuchBeanDefinitionException.class); + applicationContext.close(); + } + + @SpringBootApplication + static class CustomHealthCheckApplication { + + @Bean + public CustomHealthIndicator kafkaBinderHealthIndicator() { + return new CustomHealthIndicator(); + } + } + + static class CustomHealthIndicator implements KafkaBinderHealth { + + @Override + public Health health() { + return null; + } + } +}