From d345ac88b1a3682f3e1297da9ced1dfc1b2e45eb Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 12 Jan 2022 17:12:01 -0500 Subject: [PATCH] Enable custom binder health check impelementation Currently, KafkaBinderHealthIndicator is not customizable and included by default when Spring Boot actuator is on the classpath. Fix this by allowing the application to provide a custom implementation. A new marker interface called KafkaBinderHealth can be used by the applicaiton to provide a custom HealthIndicator implementation, in which case, the binder's default implementation will be excluded. Tests and docs changes. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1180 --- docs/src/main/asciidoc/overview.adoc | 25 +++++++ .../binder/kafka/KafkaBinderHealth.java | 29 ++++++++ .../kafka/KafkaBinderHealthIndicator.java | 9 ++- ...fkaBinderHealthIndicatorConfiguration.java | 8 ++- .../KafkaBinderCustomHealthCheckTests.java | 72 +++++++++++++++++++ 5 files changed, 136 insertions(+), 7 deletions(-) create mode 100644 spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealth.java create mode 100644 spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration2/KafkaBinderCustomHealthCheckTests.java diff --git a/docs/src/main/asciidoc/overview.adoc b/docs/src/main/asciidoc/overview.adoc index b3dca239..367ae107 100644 --- a/docs/src/main/asciidoc/overview.adoc +++ b/docs/src/main/asciidoc/overview.adoc @@ -969,3 +969,28 @@ public AdminClientConfigCustomizer adminClientConfigCustomizer() { }; } ``` + +[[custom-kafka-binder-health-indicator]] +=== Custom Kafka Binder Health Indicator + +Kafka binder activates a default health indicator when Spring Boot actuator is on the classpath. +This health indicator checks the health of the binder and any communication issues with the Kafka broker. +If an application wants to disable this default health check implementation and include a custom implementation, then it can provide an implementation for `KafkaBinderHealth` interface. +`KafkaBinderHealth` is a marker interface that extends from `HealthIndicator`. +In the custom implementation, it must provide an implementation for the `health()` method. +The custom implementation must be present in the application configuration as a bean. +When the binder discovers the custom implementation, it will use that instead of the default implementation. +Here is an example of such a custom implementation bean in the application. + +``` +@Bean +public KafkaBinderHealth kafkaBinderHealthIndicator() { + return new KafkaBinderHealth() { + @Override + public Health health() { + // custom implementation details. + } + }; +} +``` + diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealth.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealth.java new file mode 100644 index 00000000..dd6e6b6b --- /dev/null +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealth.java @@ -0,0 +1,29 @@ +/* + * Copyright 2022-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka; + +import org.springframework.boot.actuate.health.HealthIndicator; + +/** + * Marker interface used for custom KafkaBinderHealth indicator implementations. + * + * @author Soby Chacko + * @since 3.2.2 + */ +public interface KafkaBinderHealth extends HealthIndicator { + +} diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.java index 594ca55c..e35d1529 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,7 +35,6 @@ import org.apache.kafka.common.PartitionInfo; import org.springframework.beans.factory.DisposableBean; import org.springframework.boot.actuate.health.Health; -import org.springframework.boot.actuate.health.HealthIndicator; import org.springframework.boot.actuate.health.Status; import org.springframework.boot.actuate.health.StatusAggregator; import org.springframework.kafka.core.ConsumerFactory; @@ -55,7 +54,7 @@ import org.springframework.scheduling.concurrent.CustomizableThreadFactory; * @author Chukwubuikem Ume-Ugwa * @author Taras Danylchuk */ -public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBean { +public class KafkaBinderHealthIndicator implements KafkaBinderHealth, DisposableBean { private static final int DEFAULT_TIMEOUT = 60; @@ -73,7 +72,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBe private boolean considerDownWhenAnyPartitionHasNoLeader; public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder, - ConsumerFactory consumerFactory) { + ConsumerFactory consumerFactory) { this.binder = binder; this.consumerFactory = consumerFactory; } @@ -219,7 +218,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBe } @Override - public void destroy() throws Exception { + public void destroy() { executor.shutdown(); } diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderHealthIndicatorConfiguration.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderHealthIndicatorConfiguration.java index dd2dc570..67d53a18 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderHealthIndicatorConfiguration.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderHealthIndicatorConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,8 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealth; import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator; import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder; import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; @@ -38,11 +40,13 @@ import org.springframework.util.ObjectUtils; * * @author Oleg Zhurakousky * @author Chukwubuikem Ume-Ugwa + * @author Soby Chacko */ -@Configuration +@Configuration(proxyBeanMethods = false) @ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator") @ConditionalOnEnabledHealthIndicator("binders") +@ConditionalOnMissingBean(KafkaBinderHealth.class) public class KafkaBinderHealthIndicatorConfiguration { @Bean diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration2/KafkaBinderCustomHealthCheckTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration2/KafkaBinderCustomHealthCheckTests.java new file mode 100644 index 00000000..49a6c170 --- /dev/null +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration2/KafkaBinderCustomHealthCheckTests.java @@ -0,0 +1,72 @@ +/* + * Copyright 2022-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka.integration2; + +import org.junit.ClassRule; +import org.junit.Test; + +import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealth; +import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.test.rule.EmbeddedKafkaRule; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +/** + * @author Soby Chacko + */ +public class KafkaBinderCustomHealthCheckTests { + + @ClassRule + public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10); + + @Test + public void testCustomHealthIndicatorIsActivated() { + ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder( + CustomHealthCheckApplication.class).web(WebApplicationType.NONE).run( + "--spring.cloud.stream.kafka.binder.brokers=" + + embeddedKafka.getEmbeddedKafka().getBrokersAsString()); + final KafkaBinderHealth kafkaBinderHealth = applicationContext.getBean(KafkaBinderHealth.class); + assertThat(kafkaBinderHealth).isInstanceOf(CustomHealthIndicator.class); + assertThatThrownBy(() -> applicationContext.getBean(KafkaBinderHealthIndicator.class)).isInstanceOf(NoSuchBeanDefinitionException.class); + applicationContext.close(); + } + + @SpringBootApplication + static class CustomHealthCheckApplication { + + @Bean + public CustomHealthIndicator kafkaBinderHealthIndicator() { + return new CustomHealthIndicator(); + } + } + + static class CustomHealthIndicator implements KafkaBinderHealth { + + @Override + public Health health() { + return null; + } + } +}