Enable customization on StreamsBuilderFactoryBean

Spring Kafka provides a StreamsBuilderFactoryBeanCustomizer. Use this in the binder so that the
applicatons can plugin in such a bean to further customize the StreamsBuilderFactoryBean and KafkaStreams.

Resolves #784
This commit is contained in:
Soby Chacko
2019-10-24 09:35:26 -04:00
parent 28a02cda4f
commit 7f09baf72d
4 changed files with 28 additions and 9 deletions

View File

@@ -53,6 +53,7 @@ import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.ResolvableType;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
@@ -150,7 +151,8 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
@SuppressWarnings({"unchecked"})
protected StreamsBuilderFactoryBean buildStreamsBuilderAndRetrieveConfig(String beanNamePostPrefix,
ApplicationContext applicationContext, String inboundName,
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties) {
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
StreamsBuilderFactoryBeanCustomizer customizer) {
ConfigurableListableBeanFactory beanFactory = this.applicationContext
.getBeanFactory();
@@ -216,6 +218,9 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
? new StreamsBuilderFactoryBean(kafkaStreamsConfiguration)
: new StreamsBuilderFactoryBean(kafkaStreamsConfiguration,
this.cleanupConfig);
if (customizer != null) {
customizer.configure(streamsBuilder);
}
streamsBuilder.setAutoStartup(false);
BeanDefinition streamsBuilderBeanDefinition = BeanDefinitionBuilder
.genericBeanDefinition(

View File

@@ -60,6 +60,7 @@ import org.springframework.core.env.Environment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler;
import org.springframework.lang.Nullable;
@@ -268,12 +269,13 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
KStreamStreamListenerParameterAdapter kafkaStreamListenerParameterAdapter,
Collection<StreamListenerResultAdapter> streamListenerResultAdapters,
ObjectProvider<CleanupConfig> cleanupConfig) {
ObjectProvider<CleanupConfig> cleanupConfig,
ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizerProvider) {
return new KafkaStreamsStreamListenerSetupMethodOrchestrator(
bindingServiceProperties, kafkaStreamsExtendedBindingProperties,
keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue,
kafkaStreamListenerParameterAdapter, streamListenerResultAdapters,
cleanupConfig.getIfUnique());
cleanupConfig.getIfUnique(), customizerProvider.getIfUnique());
}
@Bean
@@ -370,10 +372,12 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
ObjectProvider<CleanupConfig> cleanupConfig,
StreamFunctionProperties streamFunctionProperties,
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties) {
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizerProvider) {
return new KafkaStreamsFunctionProcessor(bindingServiceProperties, kafkaStreamsExtendedBindingProperties,
keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue, kafkaStreamsMessageConversionDelegate,
cleanupConfig.getIfUnique(), streamFunctionProperties, kafkaStreamsBinderConfigurationProperties);
cleanupConfig.getIfUnique(), streamFunctionProperties, kafkaStreamsBinderConfigurationProperties,
customizerProvider.getIfUnique());
}
@Bean

View File

@@ -56,6 +56,7 @@ import org.springframework.cloud.stream.function.FunctionConstants;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.core.ResolvableType;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
@@ -79,6 +80,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
private BeanFactory beanFactory;
private StreamFunctionProperties streamFunctionProperties;
private KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties;
StreamsBuilderFactoryBeanCustomizer customizer;
public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
@@ -87,7 +89,8 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
CleanupConfig cleanupConfig,
StreamFunctionProperties streamFunctionProperties,
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties) {
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
StreamsBuilderFactoryBeanCustomizer customizer) {
super(bindingServiceProperties, kafkaStreamsBindingInformationCatalogue, kafkaStreamsExtendedBindingProperties,
keyValueSerdeResolver, cleanupConfig);
this.bindingServiceProperties = bindingServiceProperties;
@@ -97,6 +100,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
this.streamFunctionProperties = streamFunctionProperties;
this.kafkaStreamsBinderConfigurationProperties = kafkaStreamsBinderConfigurationProperties;
this.customizer = customizer;
}
private Map<String, ResolvableType> buildTypeMap(ResolvableType resolvableType,
@@ -290,7 +294,8 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
//Retrieve the StreamsConfig created for this method if available.
//Otherwise, create the StreamsBuilderFactory and get the underlying config.
if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(functionName)) {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = buildStreamsBuilderAndRetrieveConfig(functionName, applicationContext, input, kafkaStreamsBinderConfigurationProperties);
StreamsBuilderFactoryBean streamsBuilderFactoryBean = buildStreamsBuilderAndRetrieveConfig(functionName, applicationContext,
input, kafkaStreamsBinderConfigurationProperties, customizer);
this.methodStreamsBuilderFactoryBeanMap.put(functionName, streamsBuilderFactoryBean);
}
try {

View File

@@ -54,6 +54,7 @@ import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.util.Assert;
@@ -98,6 +99,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr
private final Map<Method, StreamsBuilderFactoryBean> methodStreamsBuilderFactoryBeanMap = new HashMap<>();
StreamsBuilderFactoryBeanCustomizer customizer;
KafkaStreamsStreamListenerSetupMethodOrchestrator(
BindingServiceProperties bindingServiceProperties,
KafkaStreamsExtendedBindingProperties extendedBindingProperties,
@@ -105,7 +108,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr
KafkaStreamsBindingInformationCatalogue bindingInformationCatalogue,
StreamListenerParameterAdapter streamListenerParameterAdapter,
Collection<StreamListenerResultAdapter> listenerResultAdapters,
CleanupConfig cleanupConfig) {
CleanupConfig cleanupConfig,
StreamsBuilderFactoryBeanCustomizer customizer) {
super(bindingServiceProperties, bindingInformationCatalogue, extendedBindingProperties, keyValueSerdeResolver, cleanupConfig);
this.bindingServiceProperties = bindingServiceProperties;
this.kafkaStreamsExtendedBindingProperties = extendedBindingProperties;
@@ -113,6 +117,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr
this.kafkaStreamsBindingInformationCatalogue = bindingInformationCatalogue;
this.streamListenerParameterAdapter = streamListenerParameterAdapter;
this.streamListenerResultAdapters = listenerResultAdapters;
this.customizer = customizer;
}
@Override
@@ -244,7 +249,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr
if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(method)) {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = buildStreamsBuilderAndRetrieveConfig(method.getDeclaringClass().getSimpleName() + "-" + method.getName(),
applicationContext,
inboundName, null);
inboundName, null, customizer);
this.methodStreamsBuilderFactoryBeanMap.put(method, streamsBuilderFactoryBean);
}
try {