Kafka Streams binder health indicator improvements

When using multi binder setup in Kafka Streams binder, there is an issue
in which the binder health indicator is not getting bootstapped due to a
ConditionalOnBean is unable to find a match for KafkaStreamsRegistry bean.
Fixing this issue by using an ObjectProvider instead of ConditionalOnBean.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1006
This commit is contained in:
Soby Chacko
2021-01-05 15:52:00 -05:00
parent 5921d464ed
commit ffde5d35db
5 changed files with 24 additions and 13 deletions

View File

@@ -39,8 +39,8 @@ import org.springframework.context.annotation.Import;
*/
@Configuration
@Import({ KafkaAutoConfiguration.class,
KafkaStreamsBinderHealthIndicatorConfiguration.class,
MultiBinderPropertiesConfiguration.class})
MultiBinderPropertiesConfiguration.class,
KafkaStreamsBinderHealthIndicatorConfiguration.class})
public class GlobalKTableBinderConfiguration {
@Bean
@@ -81,6 +81,9 @@ public class GlobalKTableBinderConfiguration {
beanFactory.registerSingleton(
KafkaStreamsBindingInformationCatalogue.class.getSimpleName(),
outerContext.getBean(KafkaStreamsBindingInformationCatalogue.class));
beanFactory.registerSingleton(
KafkaStreamsRegistry.class.getSimpleName(),
outerContext.getBean(KafkaStreamsRegistry.class));
};
}

View File

@@ -37,8 +37,8 @@ import org.springframework.context.annotation.Import;
*/
@Configuration
@Import({ KafkaAutoConfiguration.class,
KafkaStreamsBinderHealthIndicatorConfiguration.class,
MultiBinderPropertiesConfiguration.class})
MultiBinderPropertiesConfiguration.class,
KafkaStreamsBinderHealthIndicatorConfiguration.class})
public class KStreamBinderConfiguration {
@Bean
@@ -86,6 +86,9 @@ public class KStreamBinderConfiguration {
beanFactory.registerSingleton(
KafkaStreamsExtendedBindingProperties.class.getSimpleName(),
outerContext.getBean(KafkaStreamsExtendedBindingProperties.class));
beanFactory.registerSingleton(
KafkaStreamsRegistry.class.getSimpleName(),
outerContext.getBean(KafkaStreamsRegistry.class));
};
}

View File

@@ -39,8 +39,8 @@ import org.springframework.context.annotation.Import;
@SuppressWarnings("ALL")
@Configuration
@Import({ KafkaAutoConfiguration.class,
KafkaStreamsBinderHealthIndicatorConfiguration.class,
MultiBinderPropertiesConfiguration.class})
MultiBinderPropertiesConfiguration.class,
KafkaStreamsBinderHealthIndicatorConfiguration.class})
public class KTableBinderConfiguration {
@Bean
@@ -79,6 +79,9 @@ public class KTableBinderConfiguration {
beanFactory.registerSingleton(
KafkaStreamsBindingInformationCatalogue.class.getSimpleName(),
outerContext.getBean(KafkaStreamsBindingInformationCatalogue.class));
beanFactory.registerSingleton(
KafkaStreamsRegistry.class.getSimpleName(),
outerContext.getBean(KafkaStreamsRegistry.class));
};
}

View File

@@ -16,9 +16,9 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
@@ -36,13 +36,15 @@ import org.springframework.context.annotation.Configuration;
class KafkaStreamsBinderHealthIndicatorConfiguration {
@Bean
@ConditionalOnBean(KafkaStreamsRegistry.class)
KafkaStreamsBinderHealthIndicator kafkaStreamsBinderHealthIndicator(
KafkaStreamsRegistry kafkaStreamsRegistry, @Qualifier("binderConfigurationProperties")KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
ObjectProvider<KafkaStreamsRegistry> kafkaStreamsRegistry,
@Qualifier("binderConfigurationProperties")KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
KafkaProperties kafkaProperties, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) {
return new KafkaStreamsBinderHealthIndicator(kafkaStreamsRegistry, kafkaStreamsBinderConfigurationProperties,
kafkaProperties, kafkaStreamsBindingInformationCatalogue);
if (kafkaStreamsRegistry.getIfUnique() != null) {
return new KafkaStreamsBinderHealthIndicator(kafkaStreamsRegistry.getIfUnique(), kafkaStreamsBinderConfigurationProperties,
kafkaProperties, kafkaStreamsBindingInformationCatalogue);
}
return null;
}
}

View File

@@ -34,7 +34,7 @@ public class MultiBinderPropertiesConfiguration {
@Bean
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.streams.binder")
@ConditionalOnBean(name = "outerContext")
public KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties(KafkaProperties kafkaProperties) {
public KafkaBinderConfigurationProperties binderConfigurationProperties(KafkaProperties kafkaProperties) {
return new KafkaStreamsBinderConfigurationProperties(kafkaProperties);
}
}