Native changes required by Kafka Streams binder

This commit is contained in:
Soby Chacko
2021-04-28 12:07:53 -04:00
parent 829ce1cf7e
commit a4ad9e2c0b
6 changed files with 46 additions and 35 deletions

View File

@@ -21,6 +21,7 @@ import java.util.Map;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
@@ -43,25 +44,26 @@ import org.springframework.context.annotation.Import;
@Import({ KafkaAutoConfiguration.class,
MultiBinderPropertiesConfiguration.class,
KafkaStreamsBinderHealthIndicatorConfiguration.class})
@AutoConfigureAfter(KafkaStreamsBinderSupportAutoConfiguration.class)
public class GlobalKTableBinderConfiguration {
@Bean
public KafkaTopicProvisioner provisioningProvider(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
public KafkaTopicProvisioner globalKTableProvisioningProvider(
@Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaProperties kafkaProperties, ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}
@Bean
@Bean("globalktable")
public GlobalKTableBinder GlobalKTableBinder(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaTopicProvisioner globalKTableProvisioningProvider,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties) {
GlobalKTableBinder globalKTableBinder = new GlobalKTableBinder(binderConfigurationProperties,
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue);
globalKTableProvisioningProvider, kafkaStreamsBindingInformationCatalogue);
globalKTableBinder.setKafkaStreamsExtendedBindingProperties(
kafkaStreamsExtendedBindingProperties);
return globalKTableBinder;
@@ -69,7 +71,7 @@ public class GlobalKTableBinderConfiguration {
@Bean
@ConditionalOnBean(name = "outerContext")
public static BeanFactoryPostProcessor outerContextBeanFactoryPostProcessor() {
public static BeanFactoryPostProcessor globalKTableOuterContextBeanFactoryPostProcessor() {
return beanFactory -> {
// It is safe to call getBean("outerContext") here, because this bean is

View File

@@ -17,7 +17,9 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
@@ -41,26 +43,27 @@ import org.springframework.context.annotation.Import;
@Import({ KafkaAutoConfiguration.class,
MultiBinderPropertiesConfiguration.class,
KafkaStreamsBinderHealthIndicatorConfiguration.class})
@AutoConfigureAfter(KafkaStreamsBinderSupportAutoConfiguration.class)
public class KStreamBinderConfiguration {
@Bean
public KafkaTopicProvisioner provisioningProvider(
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
public KafkaTopicProvisioner kstreamProvisioningProvider(
@Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
KafkaProperties kafkaProperties, ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer) {
return new KafkaTopicProvisioner(kafkaStreamsBinderConfigurationProperties,
kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}
@Bean
@Bean("kstream")
public KStreamBinder kStreamBinder(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaTopicProvisioner kstreamProvisioningProvider,
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
KStreamBinder kStreamBinder = new KStreamBinder(binderConfigurationProperties,
kafkaTopicProvisioner, KafkaStreamsMessageConversionDelegate,
kstreamProvisioningProvider, KafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue, keyValueSerdeResolver);
kStreamBinder.setKafkaStreamsExtendedBindingProperties(
kafkaStreamsExtendedBindingProperties);
@@ -69,7 +72,7 @@ public class KStreamBinderConfiguration {
@Bean
@ConditionalOnBean(name = "outerContext")
public static BeanFactoryPostProcessor outerContextBeanFactoryPostProcessor() {
public static BeanFactoryPostProcessor kstreamOuterContextBeanFactoryPostProcessor() {
return beanFactory -> {
// It is safe to call getBean("outerContext") here, because this bean is

View File

@@ -46,28 +46,28 @@ import org.springframework.context.annotation.Import;
public class KTableBinderConfiguration {
@Bean
public KafkaTopicProvisioner provisioningProvider(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
public KafkaTopicProvisioner ktableProvisioningProvider(
@Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaProperties kafkaProperties, ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}
@Bean
@Bean("ktable")
public KTableBinder kTableBinder(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaTopicProvisioner ktableProvisioningProvider,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties) {
KTableBinder kTableBinder = new KTableBinder(binderConfigurationProperties,
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue);
ktableProvisioningProvider, kafkaStreamsBindingInformationCatalogue);
kTableBinder.setKafkaStreamsExtendedBindingProperties(kafkaStreamsExtendedBindingProperties);
return kTableBinder;
}
@Bean
@ConditionalOnBean(name = "outerContext")
public static BeanFactoryPostProcessor outerContextBeanFactoryPostProcessor() {
public static BeanFactoryPostProcessor ktableOuterContextBeanFactoryPostProcessor() {
return beanFactory -> {
// It is safe to call getBean("outerContext") here, because this bean is

View File

@@ -1,4 +1,7 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration,\
org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration,\
org.springframework.cloud.stream.binder.kafka.streams.endpoint.KafkaStreamsTopologyEndpointAutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration:\
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration,\
org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration,\
org.springframework.cloud.stream.binder.kafka.streams.endpoint.KafkaStreamsTopologyEndpointAutoConfiguration,\
org.springframework.cloud.stream.binder.kafka.streams.KStreamBinderConfiguration,\
org.springframework.cloud.stream.binder.kafka.streams.KTableBinderConfiguration,\
org.springframework.cloud.stream.binder.kafka.streams.GlobalKTableBinderConfiguration

View File

@@ -50,21 +50,21 @@ public class KafkaStreamsBinderBootstrapTest {
"--spring.cloud.stream.kafka.streams.bindings.input-3.consumer.application-id"
+ "=testKStreamBinderWithCustomEnvironmentCanStart-foobar",
"--spring.cloud.stream.bindings.input-1.destination=foo",
"--spring.cloud.stream.bindings.input-1.binder=kstreamBinder",
"--spring.cloud.stream.binders.kstreamBinder.type=kstream",
"--spring.cloud.stream.binders.kstreamBinder.environment"
"--spring.cloud.stream.bindings.input-1.binder=kstream",
"--spring.cloud.stream.binders.kstream.type=kstream",
"--spring.cloud.stream.binders.kstream.environment"
+ ".spring.cloud.stream.kafka.streams.binder.brokers"
+ "=" + embeddedKafka.getEmbeddedKafka().getBrokersAsString(),
"--spring.cloud.stream.bindings.input-2.destination=bar",
"--spring.cloud.stream.bindings.input-2.binder=ktableBinder",
"--spring.cloud.stream.binders.ktableBinder.type=ktable",
"--spring.cloud.stream.binders.ktableBinder.environment"
"--spring.cloud.stream.bindings.input-2.binder=ktable",
"--spring.cloud.stream.binders.ktable.type=ktable",
"--spring.cloud.stream.binders.ktable.environment"
+ ".spring.cloud.stream.kafka.streams.binder.brokers"
+ "=" + embeddedKafka.getEmbeddedKafka().getBrokersAsString(),
"--spring.cloud.stream.bindings.input-3.destination=foobar",
"--spring.cloud.stream.bindings.input-3.binder=globalktableBinder",
"--spring.cloud.stream.binders.globalktableBinder.type=globalktable",
"--spring.cloud.stream.binders.globalktableBinder.environment"
"--spring.cloud.stream.bindings.input-3.binder=globalktable",
"--spring.cloud.stream.binders.globalktable.type=globalktable",
"--spring.cloud.stream.binders.globalktable.environment"
+ ".spring.cloud.stream.kafka.streams.binder.brokers"
+ "=" + embeddedKafka.getEmbeddedKafka().getBrokersAsString());

View File

@@ -34,7 +34,6 @@ import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.actuate.health.CompositeHealthContributor;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -188,9 +187,13 @@ public class KafkaStreamsBinderHealthIndicatorTests {
private static void checkHealth(ConfigurableApplicationContext context,
Status expected) throws InterruptedException {
CompositeHealthContributor healthIndicator = context
.getBean("bindersHealthContributor", CompositeHealthContributor.class);
KafkaStreamsBinderHealthIndicator kafkaStreamsBinderHealthIndicator = (KafkaStreamsBinderHealthIndicator) healthIndicator.getContributor("kstream");
// CompositeHealthContributor healthIndicator = context
// .getBean("bindersHealthContributor", CompositeHealthContributor.class);
KafkaStreamsBinderHealthIndicator kafkaStreamsBinderHealthIndicator = context
.getBean("kafkaStreamsBinderHealthIndicator", KafkaStreamsBinderHealthIndicator.class);
//KafkaStreamsBinderHealthIndicator kafkaStreamsBinderHealthIndicator = (KafkaStreamsBinderHealthIndicator) healthIndicator.getContributor("kstream");
Health health = kafkaStreamsBinderHealthIndicator.health();
while (waitFor(health.getStatus(), health.getDetails())) {
TimeUnit.SECONDS.sleep(2);