Function detector condition Kafka Streams

Use BeanFactoryUtils.beanNamesForTypeIncludingAncestors instead of
getBean from BeanFactory which forces the bean creation inside the
function detector condition. There was a race condition in which
applications were unable to autowire beans and use them in functions
while the detector condition was creating the beans. This change will
delay the creation of the function bean until it is needed.
This commit is contained in:
Soby Chacko
2019-08-16 11:54:35 -04:00
parent 18737b8fea
commit 183f21c880
2 changed files with 40 additions and 29 deletions

View File

@@ -265,21 +265,6 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
cleanupConfig.getIfUnique());
}
@Bean
@Conditional(FunctionDetectorCondition.class)
public KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
ObjectProvider<CleanupConfig> cleanupConfig,
KafkaStreamsBindableProxyFactory bindableProxyFactory,
StreamFunctionProperties streamFunctionProperties) {
return new KafkaStreamsFunctionProcessor(bindingServiceProperties, kafkaStreamsExtendedBindingProperties,
keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue, kafkaStreamsMessageConversionDelegate,
cleanupConfig.getIfUnique(), bindableProxyFactory, streamFunctionProperties);
}
@Bean
public KafkaStreamsMessageConversionDelegate messageConversionDelegate(
CompositeMessageConverter compositeMessageConverter,
@@ -360,4 +345,19 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
return new HashMap<>();
}
@Bean
@Conditional(FunctionDetectorCondition.class)
public KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
ObjectProvider<CleanupConfig> cleanupConfig,
KafkaStreamsBindableProxyFactory bindableProxyFactory,
StreamFunctionProperties streamFunctionProperties) {
return new KafkaStreamsFunctionProcessor(bindingServiceProperties, kafkaStreamsExtendedBindingProperties,
keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue, kafkaStreamsMessageConversionDelegate,
cleanupConfig.getIfUnique(), bindableProxyFactory, streamFunctionProperties);
}
}

View File

@@ -17,8 +17,9 @@
package org.springframework.cloud.stream.binder.kafka.streams.function;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -28,6 +29,7 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.beans.factory.BeanFactoryUtils;
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
@@ -35,12 +37,13 @@ import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.ResolvableType;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
/**
* Custom {@link org.springframework.context.annotation.Condition} that detects the presence
* of java.util.Function|Consumer beans. Used for Kafka Streams function support.
*
* @author Soby Chakco
* @author Soby Chacko
* @since 2.2.0
*/
public class FunctionDetectorCondition extends SpringBootCondition {
@@ -49,13 +52,21 @@ public class FunctionDetectorCondition extends SpringBootCondition {
@Override
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) {
if (context != null && context.getBeanFactory() != null) {
Map functionTypes = context.getBeanFactory().getBeansOfType(Function.class);
functionTypes.putAll(context.getBeanFactory().getBeansOfType(Consumer.class));
functionTypes.putAll(context.getBeanFactory().getBeansOfType(BiFunction.class));
functionTypes.putAll(context.getBeanFactory().getBeansOfType(BiConsumer.class));
final Map<String, Object> kstreamFunctions = pruneFunctionBeansForKafkaStreams(functionTypes, context);
if (!kstreamFunctions.isEmpty()) {
String[] functionTypes = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context.getBeanFactory(), Function.class, true, false);
String[] consumerTypes = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context.getBeanFactory(), Consumer.class, true, false);
String[] biFunctionTypes = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context.getBeanFactory(), BiFunction.class, true, false);
String[] biConsumerTypes = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context.getBeanFactory(), BiConsumer.class, true, false);
List<String> functionComponents = new ArrayList<>();
functionComponents.addAll(Arrays.asList(functionTypes));
functionComponents.addAll(Arrays.asList(consumerTypes));
functionComponents.addAll(Arrays.asList(biFunctionTypes));
functionComponents.addAll(Arrays.asList(biConsumerTypes));
List<String> kafkaStreamsFunctions = pruneFunctionBeansForKafkaStreams(functionComponents, context);
if (!CollectionUtils.isEmpty(kafkaStreamsFunctions)) {
return ConditionOutcome.match("Matched. Function/BiFunction/Consumer beans found");
}
else {
@@ -65,11 +76,11 @@ public class FunctionDetectorCondition extends SpringBootCondition {
return ConditionOutcome.noMatch("No match. No Function/BiFunction/Consumer beans found");
}
private static <T> Map<String, T> pruneFunctionBeansForKafkaStreams(Map<String, T> originalFunctionBeans,
private static List<String> pruneFunctionBeansForKafkaStreams(List<String> strings,
ConditionContext context) {
final Map<String, T> prunedMap = new HashMap<>();
final List<String> prunedList = new ArrayList<>();
for (String key : originalFunctionBeans.keySet()) {
for (String key : strings) {
final Class<?> classObj = ClassUtils.resolveClassName(((AnnotatedBeanDefinition)
context.getBeanFactory().getBeanDefinition(key))
.getMetadata().getClassName(),
@@ -79,13 +90,13 @@ public class FunctionDetectorCondition extends SpringBootCondition {
ResolvableType resolvableType = ResolvableType.forMethodReturnType(method, classObj);
final Class<?> rawClass = resolvableType.getGeneric(0).getRawClass();
if (rawClass == KStream.class || rawClass == KTable.class || rawClass == GlobalKTable.class) {
prunedMap.put(key, originalFunctionBeans.get(key));
prunedList.add(key);
}
}
catch (NoSuchMethodException e) {
//ignore
}
}
return prunedMap;
return prunedList;
}
}