Addressing mulit binder issues with Kafka Streams
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/815 There was an issue with Kafka Streams multi binders in which the properties were not scanned properly. The last configuation is always won, wiping out any prevous enviroment properties. Addressing this issue by properly keeping KafkaBinderConfigurationProperties per multi binder environment and explicity invoking Boot properties binding on them. Adding test to verify.
This commit is contained in:
@@ -53,6 +53,8 @@ import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.env.ConfigurableEnvironment;
|
||||
import org.springframework.core.env.MutablePropertySources;
|
||||
import org.springframework.kafka.config.KafkaStreamsConfiguration;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
|
||||
@@ -61,6 +63,7 @@ import org.springframework.kafka.streams.RecoveringDeserializationExceptionHandl
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
@@ -155,7 +158,8 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
|
||||
protected StreamsBuilderFactoryBean buildStreamsBuilderAndRetrieveConfig(String beanNamePostPrefix,
|
||||
ApplicationContext applicationContext, String inboundName,
|
||||
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
|
||||
StreamsBuilderFactoryBeanCustomizer customizer) {
|
||||
StreamsBuilderFactoryBeanCustomizer customizer,
|
||||
ConfigurableEnvironment environment, BindingProperties bindingProperties) {
|
||||
ConfigurableListableBeanFactory beanFactory = this.applicationContext
|
||||
.getBeanFactory();
|
||||
|
||||
@@ -178,6 +182,52 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
|
||||
}
|
||||
}
|
||||
|
||||
final MutablePropertySources propertySources = environment.getPropertySources();
|
||||
|
||||
if (!StringUtils.isEmpty(bindingProperties.getBinder())) {
|
||||
final KafkaStreamsBinderConfigurationProperties multiBinderKafkaStreamsBinderConfigurationProperties =
|
||||
applicationContext.getBean(bindingProperties.getBinder() + "-KafkaStreamsBinderConfigurationProperties", KafkaStreamsBinderConfigurationProperties.class);
|
||||
String connectionString = multiBinderKafkaStreamsBinderConfigurationProperties.getKafkaConnectionString();
|
||||
if (StringUtils.isEmpty(connectionString)) {
|
||||
connectionString = (String) propertySources.get(bindingProperties.getBinder() + "-kafkaStreamsBinderEnv").getProperty("spring.cloud.stream.kafka.binder.brokers");
|
||||
}
|
||||
if (!StringUtils.isEmpty(connectionString)) {
|
||||
streamConfigGlobalProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, connectionString);
|
||||
}
|
||||
|
||||
String binderProvidedApplicationId = multiBinderKafkaStreamsBinderConfigurationProperties.getApplicationId();
|
||||
if (StringUtils.hasText(binderProvidedApplicationId)) {
|
||||
streamConfigGlobalProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
|
||||
binderProvidedApplicationId);
|
||||
}
|
||||
|
||||
if (multiBinderKafkaStreamsBinderConfigurationProperties
|
||||
.getDeserializationExceptionHandler() == DeserializationExceptionHandler.logAndContinue) {
|
||||
streamConfigGlobalProperties.put(
|
||||
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
|
||||
LogAndContinueExceptionHandler.class);
|
||||
}
|
||||
else if (multiBinderKafkaStreamsBinderConfigurationProperties
|
||||
.getDeserializationExceptionHandler() == DeserializationExceptionHandler.logAndFail) {
|
||||
streamConfigGlobalProperties.put(
|
||||
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
|
||||
LogAndFailExceptionHandler.class);
|
||||
}
|
||||
else if (multiBinderKafkaStreamsBinderConfigurationProperties
|
||||
.getDeserializationExceptionHandler() == DeserializationExceptionHandler.sendToDlq) {
|
||||
streamConfigGlobalProperties.put(
|
||||
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
|
||||
RecoveringDeserializationExceptionHandler.class);
|
||||
SendToDlqAndContinue sendToDlqAndContinue = applicationContext.getBean(SendToDlqAndContinue.class);
|
||||
streamConfigGlobalProperties.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, sendToDlqAndContinue);
|
||||
}
|
||||
|
||||
if (!ObjectUtils.isEmpty(multiBinderKafkaStreamsBinderConfigurationProperties.getConfiguration())) {
|
||||
streamConfigGlobalProperties.putAll(multiBinderKafkaStreamsBinderConfigurationProperties.getConfiguration());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//this is only used primarily for StreamListener based processors. Although in theory, functions can use it,
|
||||
//it is ideal for functions to use the approach used in the above if statement by using a property like
|
||||
//spring.cloud.stream.kafka.streams.binder.functions.process.configuration.num.threads (assuming that process is the function name).
|
||||
|
||||
@@ -39,7 +39,8 @@ import org.springframework.context.annotation.Import;
|
||||
*/
|
||||
@Configuration
|
||||
@Import({ KafkaAutoConfiguration.class,
|
||||
KafkaStreamsBinderHealthIndicatorConfiguration.class })
|
||||
KafkaStreamsBinderHealthIndicatorConfiguration.class,
|
||||
MutliBinderPropertiesConfiguration.class})
|
||||
public class GlobalKTableBinderConfiguration {
|
||||
|
||||
@Bean
|
||||
@@ -72,10 +73,6 @@ public class GlobalKTableBinderConfiguration {
|
||||
// and as independent from the parent context.
|
||||
ApplicationContext outerContext = (ApplicationContext) beanFactory
|
||||
.getBean("outerContext");
|
||||
beanFactory.registerSingleton(
|
||||
KafkaStreamsBinderConfigurationProperties.class.getSimpleName(),
|
||||
outerContext
|
||||
.getBean(KafkaStreamsBinderConfigurationProperties.class));
|
||||
beanFactory.registerSingleton(
|
||||
KafkaStreamsExtendedBindingProperties.class.getSimpleName(),
|
||||
outerContext.getBean(KafkaStreamsExtendedBindingProperties.class));
|
||||
|
||||
@@ -37,7 +37,8 @@ import org.springframework.context.annotation.Import;
|
||||
*/
|
||||
@Configuration
|
||||
@Import({ KafkaAutoConfiguration.class,
|
||||
KafkaStreamsBinderHealthIndicatorConfiguration.class })
|
||||
KafkaStreamsBinderHealthIndicatorConfiguration.class,
|
||||
MutliBinderPropertiesConfiguration.class})
|
||||
public class KStreamBinderConfiguration {
|
||||
|
||||
@Bean
|
||||
@@ -74,10 +75,6 @@ public class KStreamBinderConfiguration {
|
||||
// and as independent from the parent context.
|
||||
ApplicationContext outerContext = (ApplicationContext) beanFactory
|
||||
.getBean("outerContext");
|
||||
beanFactory.registerSingleton(
|
||||
KafkaStreamsBinderConfigurationProperties.class.getSimpleName(),
|
||||
outerContext
|
||||
.getBean(KafkaStreamsBinderConfigurationProperties.class));
|
||||
beanFactory.registerSingleton(
|
||||
KafkaStreamsMessageConversionDelegate.class.getSimpleName(),
|
||||
outerContext.getBean(KafkaStreamsMessageConversionDelegate.class));
|
||||
|
||||
@@ -39,7 +39,8 @@ import org.springframework.context.annotation.Import;
|
||||
@SuppressWarnings("ALL")
|
||||
@Configuration
|
||||
@Import({ KafkaAutoConfiguration.class,
|
||||
KafkaStreamsBinderHealthIndicatorConfiguration.class })
|
||||
KafkaStreamsBinderHealthIndicatorConfiguration.class,
|
||||
MutliBinderPropertiesConfiguration.class})
|
||||
public class KTableBinderConfiguration {
|
||||
|
||||
@Bean
|
||||
@@ -71,10 +72,6 @@ public class KTableBinderConfiguration {
|
||||
// and as independent from the parent context.
|
||||
ApplicationContext outerContext = (ApplicationContext) beanFactory
|
||||
.getBean("outerContext");
|
||||
beanFactory.registerSingleton(
|
||||
KafkaStreamsBinderConfigurationProperties.class.getSimpleName(),
|
||||
outerContext
|
||||
.getBean(KafkaStreamsBinderConfigurationProperties.class));
|
||||
beanFactory.registerSingleton(
|
||||
KafkaStreamsExtendedBindingProperties.class.getSimpleName(),
|
||||
outerContext.getBean(KafkaStreamsExtendedBindingProperties.class));
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
@@ -37,7 +38,7 @@ class KafkaStreamsBinderHealthIndicatorConfiguration {
|
||||
@Bean
|
||||
@ConditionalOnBean(KafkaStreamsRegistry.class)
|
||||
KafkaStreamsBinderHealthIndicator kafkaStreamsBinderHealthIndicator(
|
||||
KafkaStreamsRegistry kafkaStreamsRegistry, KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
|
||||
KafkaStreamsRegistry kafkaStreamsRegistry, @Qualifier("binderConfigurationProperties")KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
|
||||
KafkaProperties kafkaProperties, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) {
|
||||
|
||||
return new KafkaStreamsBinderHealthIndicator(kafkaStreamsRegistry, kafkaStreamsBinderConfigurationProperties,
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@@ -29,6 +30,7 @@ import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
|
||||
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
|
||||
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
@@ -38,6 +40,11 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.boot.context.properties.bind.BindResult;
|
||||
import org.springframework.boot.context.properties.bind.Bindable;
|
||||
import org.springframework.boot.context.properties.bind.Binder;
|
||||
import org.springframework.boot.context.properties.bind.PropertySourcesPlaceholdersResolver;
|
||||
import org.springframework.boot.context.properties.source.ConfigurationPropertySources;
|
||||
import org.springframework.cloud.stream.binder.BinderConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.function.FunctionDetectorCondition;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
|
||||
@@ -59,6 +66,7 @@ import org.springframework.core.env.ConfigurableEnvironment;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.core.env.MapPropertySource;
|
||||
import org.springframework.integration.context.IntegrationContextUtils;
|
||||
import org.springframework.integration.support.utils.IntegrationUtils;
|
||||
import org.springframework.kafka.config.KafkaStreamsConfiguration;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
|
||||
import org.springframework.kafka.core.CleanupConfig;
|
||||
@@ -66,6 +74,7 @@ import org.springframework.kafka.streams.RecoveringDeserializationExceptionHandl
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.converter.CompositeMessageConverter;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
@@ -91,7 +100,7 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.streams.binder")
|
||||
public KafkaStreamsBinderConfigurationProperties binderConfigurationProperties(
|
||||
KafkaProperties kafkaProperties, ConfigurableEnvironment environment,
|
||||
BindingServiceProperties properties) {
|
||||
BindingServiceProperties properties, ConfigurableApplicationContext context) throws Exception {
|
||||
final Map<String, BinderConfiguration> binderConfigurations = getBinderConfigurations(
|
||||
properties);
|
||||
for (Map.Entry<String, BinderConfiguration> entry : binderConfigurations
|
||||
@@ -104,7 +113,19 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
Map<String, Object> binderProperties = new HashMap<>();
|
||||
this.flatten(null, binderConfiguration.getProperties(), binderProperties);
|
||||
environment.getPropertySources().addFirst(
|
||||
new MapPropertySource("kafkaStreamsBinderEnv", binderProperties));
|
||||
new MapPropertySource(entry.getKey() + "-kafkaStreamsBinderEnv", binderProperties));
|
||||
|
||||
Binder binder = new Binder(ConfigurationPropertySources.get(environment),
|
||||
new PropertySourcesPlaceholdersResolver(environment),
|
||||
IntegrationUtils.getConversionService(context.getBeanFactory()), null);
|
||||
final Constructor<KafkaStreamsBinderConfigurationProperties> kafkaStreamsBinderConfigurationPropertiesConstructor =
|
||||
ReflectionUtils.accessibleConstructor(KafkaStreamsBinderConfigurationProperties.class, KafkaProperties.class);
|
||||
final KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties =
|
||||
BeanUtils.instantiateClass(kafkaStreamsBinderConfigurationPropertiesConstructor, kafkaProperties);
|
||||
final BindResult<KafkaStreamsBinderConfigurationProperties> bind = binder.bind("spring.cloud.stream.kafka.streams.binder", Bindable.ofInstance(kafkaStreamsBinderConfigurationProperties));
|
||||
context.getBeanFactory().registerSingleton(
|
||||
entry.getKey() + "-KafkaStreamsBinderConfigurationProperties",
|
||||
bind.get());
|
||||
}
|
||||
}
|
||||
return new KafkaStreamsBinderConfigurationProperties(kafkaProperties);
|
||||
@@ -145,7 +166,7 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public KafkaStreamsConfiguration kafkaStreamsConfiguration(
|
||||
KafkaStreamsBinderConfigurationProperties properties,
|
||||
@Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties properties,
|
||||
Environment environment) {
|
||||
KafkaProperties kafkaProperties = properties.getKafkaProperties();
|
||||
Map<String, Object> streamsProperties = kafkaProperties.buildStreamsProperties();
|
||||
@@ -161,7 +182,7 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
|
||||
@Bean("streamConfigGlobalProperties")
|
||||
public Map<String, Object> streamConfigGlobalProperties(
|
||||
KafkaStreamsBinderConfigurationProperties configProperties,
|
||||
@Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties configProperties,
|
||||
KafkaStreamsConfiguration kafkaStreamsConfiguration, ConfigurableEnvironment environment,
|
||||
SendToDlqAndContinue sendToDlqAndContinue) {
|
||||
|
||||
@@ -270,12 +291,12 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
KStreamStreamListenerParameterAdapter kafkaStreamListenerParameterAdapter,
|
||||
Collection<StreamListenerResultAdapter> streamListenerResultAdapters,
|
||||
ObjectProvider<CleanupConfig> cleanupConfig,
|
||||
ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizerProvider) {
|
||||
ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizerProvider, ConfigurableEnvironment environment) {
|
||||
return new KafkaStreamsStreamListenerSetupMethodOrchestrator(
|
||||
bindingServiceProperties, kafkaStreamsExtendedBindingProperties,
|
||||
keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue,
|
||||
kafkaStreamListenerParameterAdapter, streamListenerResultAdapters,
|
||||
cleanupConfig.getIfUnique(), customizerProvider.getIfUnique());
|
||||
cleanupConfig.getIfUnique(), customizerProvider.getIfUnique(), environment);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@@ -284,7 +305,7 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
CompositeMessageConverter compositeMessageConverter,
|
||||
SendToDlqAndContinue sendToDlqAndContinue,
|
||||
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
|
||||
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
|
||||
@Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
|
||||
return new KafkaStreamsMessageConversionDelegate(compositeMessageConverter, sendToDlqAndContinue,
|
||||
KafkaStreamsBindingInformationCatalogue, binderConfigurationProperties);
|
||||
}
|
||||
@@ -339,7 +360,7 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
@ConditionalOnMissingBean
|
||||
public KeyValueSerdeResolver keyValueSerdeResolver(
|
||||
@Qualifier("streamConfigGlobalProperties") Object streamConfigGlobalProperties,
|
||||
KafkaStreamsBinderConfigurationProperties properties) {
|
||||
@Qualifier("binderConfigurationProperties")KafkaStreamsBinderConfigurationProperties properties) {
|
||||
return new KeyValueSerdeResolver(
|
||||
(Map<String, Object>) streamConfigGlobalProperties, properties);
|
||||
}
|
||||
@@ -347,7 +368,7 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
@Bean
|
||||
public InteractiveQueryService interactiveQueryServices(
|
||||
KafkaStreamsRegistry kafkaStreamsRegistry,
|
||||
KafkaStreamsBinderConfigurationProperties properties) {
|
||||
@Qualifier("binderConfigurationProperties")KafkaStreamsBinderConfigurationProperties properties) {
|
||||
return new InteractiveQueryService(kafkaStreamsRegistry, properties);
|
||||
}
|
||||
|
||||
@@ -373,12 +394,12 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
|
||||
ObjectProvider<CleanupConfig> cleanupConfig,
|
||||
StreamFunctionProperties streamFunctionProperties,
|
||||
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
|
||||
ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizerProvider) {
|
||||
@Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
|
||||
ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizerProvider, ConfigurableEnvironment environment) {
|
||||
return new KafkaStreamsFunctionProcessor(bindingServiceProperties, kafkaStreamsExtendedBindingProperties,
|
||||
keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue, kafkaStreamsMessageConversionDelegate,
|
||||
cleanupConfig.getIfUnique(), streamFunctionProperties, kafkaStreamsBinderConfigurationProperties,
|
||||
customizerProvider.getIfUnique());
|
||||
customizerProvider.getIfUnique(), environment);
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -55,6 +55,7 @@ import org.springframework.cloud.stream.config.BindingServiceProperties;
|
||||
import org.springframework.cloud.stream.function.FunctionConstants;
|
||||
import org.springframework.cloud.stream.function.StreamFunctionProperties;
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.env.ConfigurableEnvironment;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
|
||||
import org.springframework.kafka.core.CleanupConfig;
|
||||
@@ -81,6 +82,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
|
||||
private StreamFunctionProperties streamFunctionProperties;
|
||||
private KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties;
|
||||
StreamsBuilderFactoryBeanCustomizer customizer;
|
||||
ConfigurableEnvironment environment;
|
||||
|
||||
public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties,
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
|
||||
@@ -90,7 +92,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
|
||||
CleanupConfig cleanupConfig,
|
||||
StreamFunctionProperties streamFunctionProperties,
|
||||
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
|
||||
StreamsBuilderFactoryBeanCustomizer customizer) {
|
||||
StreamsBuilderFactoryBeanCustomizer customizer, ConfigurableEnvironment environment) {
|
||||
super(bindingServiceProperties, kafkaStreamsBindingInformationCatalogue, kafkaStreamsExtendedBindingProperties,
|
||||
keyValueSerdeResolver, cleanupConfig);
|
||||
this.bindingServiceProperties = bindingServiceProperties;
|
||||
@@ -101,6 +103,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
|
||||
this.streamFunctionProperties = streamFunctionProperties;
|
||||
this.kafkaStreamsBinderConfigurationProperties = kafkaStreamsBinderConfigurationProperties;
|
||||
this.customizer = customizer;
|
||||
this.environment = environment;
|
||||
}
|
||||
|
||||
private Map<String, ResolvableType> buildTypeMap(ResolvableType resolvableType,
|
||||
@@ -295,7 +298,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
|
||||
//Otherwise, create the StreamsBuilderFactory and get the underlying config.
|
||||
if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(functionName)) {
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean = buildStreamsBuilderAndRetrieveConfig(functionName, applicationContext,
|
||||
input, kafkaStreamsBinderConfigurationProperties, customizer);
|
||||
input, kafkaStreamsBinderConfigurationProperties, customizer, this.environment, bindingProperties);
|
||||
this.methodStreamsBuilderFactoryBeanMap.put(functionName, streamsBuilderFactoryBean);
|
||||
}
|
||||
try {
|
||||
|
||||
@@ -53,6 +53,7 @@ import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.annotation.AnnotationUtils;
|
||||
import org.springframework.core.env.ConfigurableEnvironment;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
|
||||
import org.springframework.kafka.core.CleanupConfig;
|
||||
@@ -101,6 +102,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr
|
||||
|
||||
StreamsBuilderFactoryBeanCustomizer customizer;
|
||||
|
||||
private final ConfigurableEnvironment environment;
|
||||
|
||||
KafkaStreamsStreamListenerSetupMethodOrchestrator(
|
||||
BindingServiceProperties bindingServiceProperties,
|
||||
KafkaStreamsExtendedBindingProperties extendedBindingProperties,
|
||||
@@ -109,7 +112,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr
|
||||
StreamListenerParameterAdapter streamListenerParameterAdapter,
|
||||
Collection<StreamListenerResultAdapter> listenerResultAdapters,
|
||||
CleanupConfig cleanupConfig,
|
||||
StreamsBuilderFactoryBeanCustomizer customizer) {
|
||||
StreamsBuilderFactoryBeanCustomizer customizer,
|
||||
ConfigurableEnvironment environment) {
|
||||
super(bindingServiceProperties, bindingInformationCatalogue, extendedBindingProperties, keyValueSerdeResolver, cleanupConfig);
|
||||
this.bindingServiceProperties = bindingServiceProperties;
|
||||
this.kafkaStreamsExtendedBindingProperties = extendedBindingProperties;
|
||||
@@ -118,6 +122,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr
|
||||
this.streamListenerParameterAdapter = streamListenerParameterAdapter;
|
||||
this.streamListenerResultAdapters = listenerResultAdapters;
|
||||
this.customizer = customizer;
|
||||
this.environment = environment;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -249,7 +254,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr
|
||||
if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(method)) {
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean = buildStreamsBuilderAndRetrieveConfig(method.getDeclaringClass().getSimpleName() + "-" + method.getName(),
|
||||
applicationContext,
|
||||
inboundName, null, customizer);
|
||||
inboundName, null, customizer, this.environment, bindingProperties);
|
||||
this.methodStreamsBuilderFactoryBeanMap.put(method, streamsBuilderFactoryBean);
|
||||
}
|
||||
try {
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class MutliBinderPropertiesConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.streams.binder")
|
||||
public KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties(KafkaProperties kafkaProperties) {
|
||||
return new KafkaStreamsBinderConfigurationProperties(kafkaProperties);
|
||||
}
|
||||
}
|
||||
@@ -76,7 +76,7 @@ public class MultipleFunctionsInSameAppTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKstreamWordCountFunction() throws InterruptedException {
|
||||
public void testMultiFunctionsInSameApp() throws InterruptedException {
|
||||
SpringApplication app = new SpringApplication(MultipleFunctionsInSameApp.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
|
||||
@@ -111,6 +111,51 @@ public class MultipleFunctionsInSameAppTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiFunctionsInSameAppWithMultiBinders() throws InterruptedException {
|
||||
SpringApplication app = new SpringApplication(MultipleFunctionsInSameApp.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
|
||||
try (ConfigurableApplicationContext context = app.run(
|
||||
"--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.function.definition=process;analyze",
|
||||
"--spring.cloud.stream.bindings.process-in-0.destination=purchases",
|
||||
"--spring.cloud.stream.bindings.process-in-0.binder=kafka1",
|
||||
"--spring.cloud.stream.bindings.process-out-0.destination=coffee",
|
||||
"--spring.cloud.stream.bindings.process-out-0.binder=kafka1",
|
||||
"--spring.cloud.stream.bindings.process-out-1.destination=electronics",
|
||||
"--spring.cloud.stream.bindings.process-out-1.binder=kafka1",
|
||||
"--spring.cloud.stream.bindings.analyze-in-0.destination=coffee",
|
||||
"--spring.cloud.stream.bindings.analyze-in-0.binder=kafka2",
|
||||
"--spring.cloud.stream.bindings.analyze-in-1.destination=electronics",
|
||||
"--spring.cloud.stream.bindings.analyze-in-1.binder=kafka2",
|
||||
"--spring.cloud.stream.binders.kafka1.type=kstream",
|
||||
"--spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
|
||||
"--spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.applicationId=my-app-1",
|
||||
"--spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.client.id=process-client",
|
||||
"--spring.cloud.stream.binders.kafka2.type=kstream",
|
||||
"--spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
|
||||
"--spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.applicationId=my-app-2",
|
||||
"--spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.client.id=analyze-client")) {
|
||||
receiveAndValidate("purchases", "coffee", "electronics");
|
||||
|
||||
StreamsBuilderFactoryBean processStreamsBuilderFactoryBean = context
|
||||
.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
|
||||
|
||||
StreamsBuilderFactoryBean analyzeStreamsBuilderFactoryBean = context
|
||||
.getBean("&stream-builder-analyze", StreamsBuilderFactoryBean.class);
|
||||
|
||||
final Properties processStreamsConfiguration = processStreamsBuilderFactoryBean.getStreamsConfiguration();
|
||||
final Properties analyzeStreamsConfiguration = analyzeStreamsBuilderFactoryBean.getStreamsConfiguration();
|
||||
|
||||
assertThat(processStreamsConfiguration.getProperty("application.id")).isEqualTo("my-app-1");
|
||||
assertThat(analyzeStreamsConfiguration.getProperty("application.id")).isEqualTo("my-app-2");
|
||||
assertThat(processStreamsConfiguration.getProperty("client.id")).isEqualTo("process-client");
|
||||
assertThat(analyzeStreamsConfiguration.getProperty("client.id")).isEqualTo("analyze-client");
|
||||
}
|
||||
}
|
||||
|
||||
private void receiveAndValidate(String in, String... out) throws InterruptedException {
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
|
||||
Reference in New Issue
Block a user