Enable KafkaStreams bootstrap tests

Remove the usage of `ImportBeanDefinitionRegistrar` in Kafka Streams binder
components since the regular use of getBean from the outer context is safe to do so.

Unignore tests

Resolves #501

* Addressing PR review comments
This commit is contained in:
Soby Chacko
2018-11-27 16:54:31 -05:00
committed by Artem Bilan
parent bf1c366d9b
commit 1866feb75f
5 changed files with 49 additions and 93 deletions

View File

@@ -19,13 +19,15 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Map;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
/**
* Configuration for GlobalKTable binder.
@@ -34,9 +36,22 @@ import org.springframework.context.annotation.Import;
* @since 2.1.0
*/
@Configuration
@Import(KafkaStreamsBinderUtils.KafkaStreamsMissingBeansRegistrar.class)
public class GlobalKTableBinderConfiguration {
@Bean
@ConditionalOnBean(name = "outerContext")
public static BeanFactoryPostProcessor outerContextBeanFactoryPostProcessor() {
return (beanFactory) -> {
// It is safe to call getBean("outerContext") here, because this bean is registered as first
// and as independent from the parent context.
ApplicationContext outerContext = (ApplicationContext) beanFactory.getBean("outerContext");
beanFactory.registerSingleton(KafkaStreamsBinderConfigurationProperties.class.getSimpleName(), outerContext
.getBean(KafkaStreamsBinderConfigurationProperties.class));
beanFactory.registerSingleton(KafkaStreamsBindingInformationCatalogue.class.getSimpleName(), outerContext
.getBean(KafkaStreamsBindingInformationCatalogue.class));
};
}
@Bean
public KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties binderConfigurationProperties,
KafkaProperties kafkaProperties) {

View File

@@ -19,20 +19,17 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Map;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.MethodInvokingFactoryBean;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.type.AnnotationMetadata;
/**
* Configuration for KStream binder.
@@ -42,13 +39,14 @@ import org.springframework.core.type.AnnotationMetadata;
* @author Soby Chacko
*/
@Configuration
@Import({KafkaAutoConfiguration.class, KStreamBinderConfiguration.KStreamMissingBeansRegistrar.class})
@Import({KafkaAutoConfiguration.class})
public class KStreamBinderConfiguration {
@Bean
public KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties binderConfigurationProperties,
public KafkaTopicProvisioner provisioningProvider(KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
KafkaProperties kafkaProperties) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties);
return new KafkaTopicProvisioner(kafkaStreamsBinderConfigurationProperties, kafkaProperties);
}
@Bean
@@ -66,45 +64,25 @@ public class KStreamBinderConfiguration {
return kStreamBinder;
}
/**
* Registrar for missing beans when there are multiple binders in the application.
*/
static class KStreamMissingBeansRegistrar extends KafkaStreamsBinderUtils.KafkaStreamsMissingBeansRegistrar {
@Bean
@ConditionalOnBean(name = "outerContext")
public static BeanFactoryPostProcessor outerContextBeanFactoryPostProcessor() {
return beanFactory -> {
private static final String BEAN_NAME = "outerContext";
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata,
BeanDefinitionRegistry registry) {
super.registerBeanDefinitions(importingClassMetadata, registry);
if (registry.containsBeanDefinition(BEAN_NAME)) {
AbstractBeanDefinition converstionDelegateBean = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingFactoryBean.class)
.addPropertyReference("targetObject", BEAN_NAME)
.addPropertyValue("targetMethod", "getBean")
.addPropertyValue("arguments", KafkaStreamsMessageConversionDelegate.class)
.getBeanDefinition();
registry.registerBeanDefinition(KafkaStreamsMessageConversionDelegate.class.getSimpleName(), converstionDelegateBean);
AbstractBeanDefinition keyValueSerdeResolverBean = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingFactoryBean.class)
.addPropertyReference("targetObject", BEAN_NAME)
.addPropertyValue("targetMethod", "getBean")
.addPropertyValue("arguments", KeyValueSerdeResolver.class)
.getBeanDefinition();
registry.registerBeanDefinition(KeyValueSerdeResolver.class.getSimpleName(), keyValueSerdeResolverBean);
AbstractBeanDefinition kafkaStreamsExtendedBindingPropertiesBean = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingFactoryBean.class)
.addPropertyReference("targetObject", BEAN_NAME)
.addPropertyValue("targetMethod", "getBean")
.addPropertyValue("arguments", KafkaStreamsExtendedBindingProperties.class)
.getBeanDefinition();
registry.registerBeanDefinition(KafkaStreamsExtendedBindingProperties.class.getSimpleName(), kafkaStreamsExtendedBindingPropertiesBean);
}
}
// It is safe to call getBean("outerContext") here, because this bean is registered as first
// and as independent from the parent context.
ApplicationContext outerContext = (ApplicationContext) beanFactory.getBean("outerContext");
beanFactory.registerSingleton(KafkaStreamsBinderConfigurationProperties.class.getSimpleName(), outerContext
.getBean(KafkaStreamsBinderConfigurationProperties.class));
beanFactory.registerSingleton(KafkaStreamsMessageConversionDelegate.class.getSimpleName(), outerContext
.getBean(KafkaStreamsMessageConversionDelegate.class));
beanFactory.registerSingleton(KafkaStreamsBindingInformationCatalogue.class.getSimpleName(), outerContext
.getBean(KafkaStreamsBindingInformationCatalogue.class));
beanFactory.registerSingleton(KeyValueSerdeResolver.class.getSimpleName(), outerContext
.getBean(KeyValueSerdeResolver.class));
beanFactory.registerSingleton(KafkaStreamsExtendedBindingProperties.class.getSimpleName(), outerContext
.getBean(KafkaStreamsExtendedBindingProperties.class));
};
}
}

View File

@@ -28,7 +28,6 @@ import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStr
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
/**
* Configuration for KTable binder.
@@ -37,13 +36,14 @@ import org.springframework.context.annotation.Import;
*/
@SuppressWarnings("ALL")
@Configuration
@Import(KafkaStreamsBinderUtils.KafkaStreamsMissingBeansRegistrar.class)
public class KTableBinderConfiguration {
@Bean
@ConditionalOnBean(name = "outerContext")
public BeanFactoryPostProcessor outerContextBeanFactoryPostProcessor() {
public static BeanFactoryPostProcessor outerContextBeanFactoryPostProcessor() {
return (beanFactory) -> {
// It is safe to call getBean("outerContext") here, because this bean is registered as first
// and as independent from the parent context.
ApplicationContext outerContext = (ApplicationContext) beanFactory.getBean("outerContext");
beanFactory.registerSingleton(KafkaStreamsBinderConfigurationProperties.class.getSimpleName(), outerContext
.getBean(KafkaStreamsBinderConfigurationProperties.class));

View File

@@ -18,18 +18,12 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Map;
import org.springframework.beans.factory.config.MethodInvokingFactoryBean;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.util.StringUtils;
/**
@@ -78,35 +72,4 @@ final class KafkaStreamsBinderUtils {
}
}
/**
* Helper lass for missing bean registration.
*/
static class KafkaStreamsMissingBeansRegistrar implements ImportBeanDefinitionRegistrar {
private static final String BEAN_NAME = "outerContext";
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata,
BeanDefinitionRegistry registry) {
if (registry.containsBeanDefinition(BEAN_NAME)) {
AbstractBeanDefinition configBean = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingFactoryBean.class)
.addPropertyReference("targetObject", BEAN_NAME)
.addPropertyValue("targetMethod", "getBean")
.addPropertyValue("arguments", KafkaStreamsBinderConfigurationProperties.class)
.getBeanDefinition();
registry.registerBeanDefinition(KafkaStreamsBinderConfigurationProperties.class.getSimpleName(), configBean);
AbstractBeanDefinition catalogueBean = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingFactoryBean.class)
.addPropertyReference("targetObject", BEAN_NAME)
.addPropertyValue("targetMethod", "getBean")
.addPropertyValue("arguments", KafkaStreamsBindingInformationCatalogue.class)
.getBeanDefinition();
registry.registerBeanDefinition(KafkaStreamsBindingInformationCatalogue.class.getSimpleName(), catalogueBean);
}
}
}
}

View File

@@ -18,7 +18,6 @@ package org.springframework.cloud.stream.binder.kafka.streams.bootstrap;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.boot.WebApplicationType;
@@ -33,7 +32,6 @@ import org.springframework.kafka.test.rule.KafkaEmbedded;
/**
* @author Soby Chacko
*/
@Ignore("Temporarily disabling the test as builds are getting slower due to this.")
public class KafkaStreamsBinderBootstrapTest {
@ClassRule
@@ -43,7 +41,8 @@ public class KafkaStreamsBinderBootstrapTest {
public void testKafkaStreamsBinderWithCustomEnvironmentCanStart() {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class)
.web(WebApplicationType.NONE)
.run("--spring.cloud.stream.bindings.input.destination=foo",
.run("--spring.cloud.stream.kafka.streams.default.consumer.application-id=testKafkaStreamsBinderWithCustomEnvironmentCanStart",
"--spring.cloud.stream.bindings.input.destination=foo",
"--spring.cloud.stream.bindings.input.binder=kBind1",
"--spring.cloud.stream.binders.kBind1.type=kstream",
"--spring.cloud.stream.binders.kBind1.environment.spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
@@ -56,7 +55,8 @@ public class KafkaStreamsBinderBootstrapTest {
public void testKafkaStreamsBinderWithStandardConfigurationCanStart() {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class)
.web(WebApplicationType.NONE)
.run("--spring.cloud.stream.bindings.input.destination=foo",
.run("--spring.cloud.stream.kafka.streams.default.consumer.application-id=testKafkaStreamsBinderWithStandardConfigurationCanStart",
"--spring.cloud.stream.bindings.input.destination=foo",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());