From a4ad9e2c0bf50c02044b64ae0cf406928f59e20c Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 28 Apr 2021 12:07:53 -0400 Subject: [PATCH] Native changes required by Kafka Streams binder --- .../GlobalKTableBinderConfiguration.java | 14 ++++++++------ .../streams/KStreamBinderConfiguration.java | 15 +++++++++------ .../streams/KTableBinderConfiguration.java | 12 ++++++------ .../main/resources/META-INF/spring.factories | 11 +++++++---- .../KafkaStreamsBinderBootstrapTest.java | 18 +++++++++--------- ...KafkaStreamsBinderHealthIndicatorTests.java | 11 +++++++---- 6 files changed, 46 insertions(+), 35 deletions(-) diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java index 7eae7953..81a69584 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java @@ -21,6 +21,7 @@ import java.util.Map; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; @@ -43,25 +44,26 @@ import org.springframework.context.annotation.Import; @Import({ KafkaAutoConfiguration.class, MultiBinderPropertiesConfiguration.class, KafkaStreamsBinderHealthIndicatorConfiguration.class}) +@AutoConfigureAfter(KafkaStreamsBinderSupportAutoConfiguration.class) public class GlobalKTableBinderConfiguration { @Bean - public KafkaTopicProvisioner provisioningProvider( - KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, + public KafkaTopicProvisioner globalKTableProvisioningProvider( + @Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, KafkaProperties kafkaProperties, ObjectProvider adminClientConfigCustomizer) { return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer.getIfUnique()); } - @Bean + @Bean("globalktable") public GlobalKTableBinder GlobalKTableBinder( KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, - KafkaTopicProvisioner kafkaTopicProvisioner, + KafkaTopicProvisioner globalKTableProvisioningProvider, KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, @Qualifier("streamConfigGlobalProperties") Map streamConfigGlobalProperties) { GlobalKTableBinder globalKTableBinder = new GlobalKTableBinder(binderConfigurationProperties, - kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue); + globalKTableProvisioningProvider, kafkaStreamsBindingInformationCatalogue); globalKTableBinder.setKafkaStreamsExtendedBindingProperties( kafkaStreamsExtendedBindingProperties); return globalKTableBinder; @@ -69,7 +71,7 @@ public class GlobalKTableBinderConfiguration { @Bean @ConditionalOnBean(name = "outerContext") - public static BeanFactoryPostProcessor outerContextBeanFactoryPostProcessor() { + public static BeanFactoryPostProcessor globalKTableOuterContextBeanFactoryPostProcessor() { return beanFactory -> { // It is safe to call getBean("outerContext") here, because this bean is diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java index 9450bc39..a7ee25cc 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java @@ -17,7 +17,9 @@ package org.springframework.cloud.stream.binder.kafka.streams; import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; @@ -41,26 +43,27 @@ import org.springframework.context.annotation.Import; @Import({ KafkaAutoConfiguration.class, MultiBinderPropertiesConfiguration.class, KafkaStreamsBinderHealthIndicatorConfiguration.class}) +@AutoConfigureAfter(KafkaStreamsBinderSupportAutoConfiguration.class) public class KStreamBinderConfiguration { @Bean - public KafkaTopicProvisioner provisioningProvider( - KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, + public KafkaTopicProvisioner kstreamProvisioningProvider( + @Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, KafkaProperties kafkaProperties, ObjectProvider adminClientConfigCustomizer) { return new KafkaTopicProvisioner(kafkaStreamsBinderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer.getIfUnique()); } - @Bean + @Bean("kstream") public KStreamBinder kStreamBinder( KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, - KafkaTopicProvisioner kafkaTopicProvisioner, + KafkaTopicProvisioner kstreamProvisioningProvider, KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate, KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue, KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) { KStreamBinder kStreamBinder = new KStreamBinder(binderConfigurationProperties, - kafkaTopicProvisioner, KafkaStreamsMessageConversionDelegate, + kstreamProvisioningProvider, KafkaStreamsMessageConversionDelegate, KafkaStreamsBindingInformationCatalogue, keyValueSerdeResolver); kStreamBinder.setKafkaStreamsExtendedBindingProperties( kafkaStreamsExtendedBindingProperties); @@ -69,7 +72,7 @@ public class KStreamBinderConfiguration { @Bean @ConditionalOnBean(name = "outerContext") - public static BeanFactoryPostProcessor outerContextBeanFactoryPostProcessor() { + public static BeanFactoryPostProcessor kstreamOuterContextBeanFactoryPostProcessor() { return beanFactory -> { // It is safe to call getBean("outerContext") here, because this bean is diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java index 825e4a2a..4da804e1 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java @@ -46,28 +46,28 @@ import org.springframework.context.annotation.Import; public class KTableBinderConfiguration { @Bean - public KafkaTopicProvisioner provisioningProvider( - KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, + public KafkaTopicProvisioner ktableProvisioningProvider( + @Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, KafkaProperties kafkaProperties, ObjectProvider adminClientConfigCustomizer) { return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer.getIfUnique()); } - @Bean + @Bean("ktable") public KTableBinder kTableBinder( KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, - KafkaTopicProvisioner kafkaTopicProvisioner, + KafkaTopicProvisioner ktableProvisioningProvider, KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, @Qualifier("streamConfigGlobalProperties") Map streamConfigGlobalProperties) { KTableBinder kTableBinder = new KTableBinder(binderConfigurationProperties, - kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue); + ktableProvisioningProvider, kafkaStreamsBindingInformationCatalogue); kTableBinder.setKafkaStreamsExtendedBindingProperties(kafkaStreamsExtendedBindingProperties); return kTableBinder; } @Bean @ConditionalOnBean(name = "outerContext") - public static BeanFactoryPostProcessor outerContextBeanFactoryPostProcessor() { + public static BeanFactoryPostProcessor ktableOuterContextBeanFactoryPostProcessor() { return beanFactory -> { // It is safe to call getBean("outerContext") here, because this bean is diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/resources/META-INF/spring.factories b/spring-cloud-stream-binder-kafka-streams/src/main/resources/META-INF/spring.factories index ca6492f0..8f59c3d6 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-stream-binder-kafka-streams/src/main/resources/META-INF/spring.factories @@ -1,4 +1,7 @@ -org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ - org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration,\ - org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration,\ - org.springframework.cloud.stream.binder.kafka.streams.endpoint.KafkaStreamsTopologyEndpointAutoConfiguration +org.springframework.boot.autoconfigure.EnableAutoConfiguration:\ +org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration,\ +org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration,\ +org.springframework.cloud.stream.binder.kafka.streams.endpoint.KafkaStreamsTopologyEndpointAutoConfiguration,\ +org.springframework.cloud.stream.binder.kafka.streams.KStreamBinderConfiguration,\ +org.springframework.cloud.stream.binder.kafka.streams.KTableBinderConfiguration,\ +org.springframework.cloud.stream.binder.kafka.streams.GlobalKTableBinderConfiguration diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java index 5ed9c0b6..1d8a28be 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java @@ -50,21 +50,21 @@ public class KafkaStreamsBinderBootstrapTest { "--spring.cloud.stream.kafka.streams.bindings.input-3.consumer.application-id" + "=testKStreamBinderWithCustomEnvironmentCanStart-foobar", "--spring.cloud.stream.bindings.input-1.destination=foo", - "--spring.cloud.stream.bindings.input-1.binder=kstreamBinder", - "--spring.cloud.stream.binders.kstreamBinder.type=kstream", - "--spring.cloud.stream.binders.kstreamBinder.environment" + "--spring.cloud.stream.bindings.input-1.binder=kstream", + "--spring.cloud.stream.binders.kstream.type=kstream", + "--spring.cloud.stream.binders.kstream.environment" + ".spring.cloud.stream.kafka.streams.binder.brokers" + "=" + embeddedKafka.getEmbeddedKafka().getBrokersAsString(), "--spring.cloud.stream.bindings.input-2.destination=bar", - "--spring.cloud.stream.bindings.input-2.binder=ktableBinder", - "--spring.cloud.stream.binders.ktableBinder.type=ktable", - "--spring.cloud.stream.binders.ktableBinder.environment" + "--spring.cloud.stream.bindings.input-2.binder=ktable", + "--spring.cloud.stream.binders.ktable.type=ktable", + "--spring.cloud.stream.binders.ktable.environment" + ".spring.cloud.stream.kafka.streams.binder.brokers" + "=" + embeddedKafka.getEmbeddedKafka().getBrokersAsString(), "--spring.cloud.stream.bindings.input-3.destination=foobar", - "--spring.cloud.stream.bindings.input-3.binder=globalktableBinder", - "--spring.cloud.stream.binders.globalktableBinder.type=globalktable", - "--spring.cloud.stream.binders.globalktableBinder.environment" + "--spring.cloud.stream.bindings.input-3.binder=globalktable", + "--spring.cloud.stream.binders.globalktable.type=globalktable", + "--spring.cloud.stream.binders.globalktable.environment" + ".spring.cloud.stream.kafka.streams.binder.brokers" + "=" + embeddedKafka.getEmbeddedKafka().getBrokersAsString()); diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderHealthIndicatorTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderHealthIndicatorTests.java index 54dcf9d5..e0dd96e4 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderHealthIndicatorTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderHealthIndicatorTests.java @@ -34,7 +34,6 @@ import org.junit.Test; import org.springframework.boot.SpringApplication; import org.springframework.boot.WebApplicationType; -import org.springframework.boot.actuate.health.CompositeHealthContributor; import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Status; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; @@ -188,9 +187,13 @@ public class KafkaStreamsBinderHealthIndicatorTests { private static void checkHealth(ConfigurableApplicationContext context, Status expected) throws InterruptedException { - CompositeHealthContributor healthIndicator = context - .getBean("bindersHealthContributor", CompositeHealthContributor.class); - KafkaStreamsBinderHealthIndicator kafkaStreamsBinderHealthIndicator = (KafkaStreamsBinderHealthIndicator) healthIndicator.getContributor("kstream"); +// CompositeHealthContributor healthIndicator = context +// .getBean("bindersHealthContributor", CompositeHealthContributor.class); + + KafkaStreamsBinderHealthIndicator kafkaStreamsBinderHealthIndicator = context + .getBean("kafkaStreamsBinderHealthIndicator", KafkaStreamsBinderHealthIndicator.class); + + //KafkaStreamsBinderHealthIndicator kafkaStreamsBinderHealthIndicator = (KafkaStreamsBinderHealthIndicator) healthIndicator.getContributor("kstream"); Health health = kafkaStreamsBinderHealthIndicator.health(); while (waitFor(health.getStatus(), health.getDetails())) { TimeUnit.SECONDS.sleep(2);