From ffde5d35db84f3e3bc92db6e61ed19fc31d31d81 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 5 Jan 2021 15:52:00 -0500 Subject: [PATCH] Kafka Streams binder health indicator improvements When using multi binder setup in Kafka Streams binder, there is an issue in which the binder health indicator is not getting bootstapped due to a ConditionalOnBean is unable to find a match for KafkaStreamsRegistry bean. Fixing this issue by using an ObjectProvider instead of ConditionalOnBean. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1006 --- .../streams/GlobalKTableBinderConfiguration.java | 7 +++++-- .../kafka/streams/KStreamBinderConfiguration.java | 7 +++++-- .../kafka/streams/KTableBinderConfiguration.java | 7 +++++-- ...aStreamsBinderHealthIndicatorConfiguration.java | 14 ++++++++------ .../MultiBinderPropertiesConfiguration.java | 2 +- 5 files changed, 24 insertions(+), 13 deletions(-) diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java index 35e12240..9153f0d8 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java @@ -39,8 +39,8 @@ import org.springframework.context.annotation.Import; */ @Configuration @Import({ KafkaAutoConfiguration.class, - KafkaStreamsBinderHealthIndicatorConfiguration.class, - MultiBinderPropertiesConfiguration.class}) + MultiBinderPropertiesConfiguration.class, + KafkaStreamsBinderHealthIndicatorConfiguration.class}) public class GlobalKTableBinderConfiguration { @Bean @@ -81,6 +81,9 @@ public class GlobalKTableBinderConfiguration { beanFactory.registerSingleton( KafkaStreamsBindingInformationCatalogue.class.getSimpleName(), outerContext.getBean(KafkaStreamsBindingInformationCatalogue.class)); + beanFactory.registerSingleton( + KafkaStreamsRegistry.class.getSimpleName(), + outerContext.getBean(KafkaStreamsRegistry.class)); }; } diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java index 254bfb53..21de23e3 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java @@ -37,8 +37,8 @@ import org.springframework.context.annotation.Import; */ @Configuration @Import({ KafkaAutoConfiguration.class, - KafkaStreamsBinderHealthIndicatorConfiguration.class, - MultiBinderPropertiesConfiguration.class}) + MultiBinderPropertiesConfiguration.class, + KafkaStreamsBinderHealthIndicatorConfiguration.class}) public class KStreamBinderConfiguration { @Bean @@ -86,6 +86,9 @@ public class KStreamBinderConfiguration { beanFactory.registerSingleton( KafkaStreamsExtendedBindingProperties.class.getSimpleName(), outerContext.getBean(KafkaStreamsExtendedBindingProperties.class)); + beanFactory.registerSingleton( + KafkaStreamsRegistry.class.getSimpleName(), + outerContext.getBean(KafkaStreamsRegistry.class)); }; } diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java index 4281bf65..0ef9fa6e 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java @@ -39,8 +39,8 @@ import org.springframework.context.annotation.Import; @SuppressWarnings("ALL") @Configuration @Import({ KafkaAutoConfiguration.class, - KafkaStreamsBinderHealthIndicatorConfiguration.class, - MultiBinderPropertiesConfiguration.class}) + MultiBinderPropertiesConfiguration.class, + KafkaStreamsBinderHealthIndicatorConfiguration.class}) public class KTableBinderConfiguration { @Bean @@ -79,6 +79,9 @@ public class KTableBinderConfiguration { beanFactory.registerSingleton( KafkaStreamsBindingInformationCatalogue.class.getSimpleName(), outerContext.getBean(KafkaStreamsBindingInformationCatalogue.class)); + beanFactory.registerSingleton( + KafkaStreamsRegistry.class.getSimpleName(), + outerContext.getBean(KafkaStreamsRegistry.class)); }; } diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderHealthIndicatorConfiguration.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderHealthIndicatorConfiguration.java index 46a7c2c1..55ecf2b0 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderHealthIndicatorConfiguration.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderHealthIndicatorConfiguration.java @@ -16,9 +16,9 @@ package org.springframework.cloud.stream.binder.kafka.streams; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties; @@ -36,13 +36,15 @@ import org.springframework.context.annotation.Configuration; class KafkaStreamsBinderHealthIndicatorConfiguration { @Bean - @ConditionalOnBean(KafkaStreamsRegistry.class) KafkaStreamsBinderHealthIndicator kafkaStreamsBinderHealthIndicator( - KafkaStreamsRegistry kafkaStreamsRegistry, @Qualifier("binderConfigurationProperties")KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, + ObjectProvider kafkaStreamsRegistry, + @Qualifier("binderConfigurationProperties")KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, KafkaProperties kafkaProperties, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) { - - return new KafkaStreamsBinderHealthIndicator(kafkaStreamsRegistry, kafkaStreamsBinderConfigurationProperties, - kafkaProperties, kafkaStreamsBindingInformationCatalogue); + if (kafkaStreamsRegistry.getIfUnique() != null) { + return new KafkaStreamsBinderHealthIndicator(kafkaStreamsRegistry.getIfUnique(), kafkaStreamsBinderConfigurationProperties, + kafkaProperties, kafkaStreamsBindingInformationCatalogue); + } + return null; } } diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/MultiBinderPropertiesConfiguration.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/MultiBinderPropertiesConfiguration.java index 176577f8..466cfdf3 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/MultiBinderPropertiesConfiguration.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/MultiBinderPropertiesConfiguration.java @@ -34,7 +34,7 @@ public class MultiBinderPropertiesConfiguration { @Bean @ConfigurationProperties(prefix = "spring.cloud.stream.kafka.streams.binder") @ConditionalOnBean(name = "outerContext") - public KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties(KafkaProperties kafkaProperties) { + public KafkaBinderConfigurationProperties binderConfigurationProperties(KafkaProperties kafkaProperties) { return new KafkaStreamsBinderConfigurationProperties(kafkaProperties); } }