Kafka Streams component bean scanning enhancements

When functional beans are discovered from libraries in the classpath,
it causes issues when Kafka Streams functions are scanned and bootstrapped.
Binder expects the users to provide function definition property although the
application does not directly include these functional beans or is aware of it.
Fixing this issue by excluding non kafka streams function from scanning.

Null check around adding micrometer listener to the StreamsBuilder.

Addressing issues raised by the comments here:
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1030#issuecomment-804039087
This commit is contained in:
Soby Chacko
2021-03-22 18:55:20 -04:00
parent f8b290844b
commit f25dbff2b7
2 changed files with 52 additions and 13 deletions

View File

@@ -81,7 +81,9 @@ class StreamsBuilderFactoryManager implements SmartLifecycle {
.getStreamsBuilderFactoryBeans();
int n = 0;
for (StreamsBuilderFactoryBean streamsBuilderFactoryBean : streamsBuilderFactoryBeans) {
streamsBuilderFactoryBean.addListener(this.listener);
if (this.listener != null) {
streamsBuilderFactoryBean.addListener(this.listener);
}
streamsBuilderFactoryBean.start();
this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
}

View File

@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.streams.function;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -65,6 +66,9 @@ public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean,
private final StreamFunctionProperties streamFunctionProperties;
private Map<String, ResolvableType> kafkaStreamsOnlyResolvableTypes = new HashMap<>();
private Map<String, Method> kafakStreamsOnlyMethods = new HashMap<>();
public KafkaStreamsFunctionBeanPostProcessor(StreamFunctionProperties streamFunctionProperties) {
this.streamFunctionProperties = streamFunctionProperties;
}
@@ -89,10 +93,14 @@ public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean,
Stream.concat(Stream.of(biFunctionNames), Stream.of(biConsumerNames)));
final List<String> collect = concat.collect(Collectors.toList());
collect.removeIf(s -> Arrays.stream(EXCLUDE_FUNCTIONS).anyMatch(t -> t.equals(s)));
onlySingleFunction = collect.size() == 1;
collect.stream()
.forEach(this::extractResolvableTypes);
kafkaStreamsOnlyResolvableTypes.keySet().forEach(k -> addResolvableTypeInfo(k, kafkaStreamsOnlyResolvableTypes.get(k)));
kafakStreamsOnlyMethods.keySet().forEach(k -> addResolvableTypeInfo(k, kafakStreamsOnlyMethods.get(k)));
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
for (String s : getResolvableTypes().keySet()) {
@@ -115,15 +123,15 @@ public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean,
ClassUtils.getDefaultClassLoader());
try {
Method[] methods = classObj.getMethods();
Optional<Method> kafkaStreamMethod = Arrays.stream(methods).filter(m -> m.getName().equals(key)).findFirst();
if (!kafkaStreamMethod.isPresent()) {
Optional<Method> functionalBeanMethods = Arrays.stream(methods).filter(m -> m.getName().equals(key)).findFirst();
if (!functionalBeanMethods.isPresent()) {
final BeanDefinition beanDefinition = this.beanFactory.getBeanDefinition(key);
final String factoryMethodName = beanDefinition.getFactoryMethodName();
kafkaStreamMethod = Arrays.stream(methods).filter(m -> m.getName().equals(factoryMethodName)).findFirst();
functionalBeanMethods = Arrays.stream(methods).filter(m -> m.getName().equals(factoryMethodName)).findFirst();
}
if (kafkaStreamMethod.isPresent()) {
Method method = kafkaStreamMethod.get();
if (functionalBeanMethods.isPresent()) {
Method method = functionalBeanMethods.get();
ResolvableType resolvableType = ResolvableType.forMethodReturnType(method, classObj);
final Class<?> rawClass = resolvableType.getGeneric(0).getRawClass();
if (rawClass == KStream.class || rawClass == KTable.class || rawClass == GlobalKTable.class) {
@@ -131,7 +139,7 @@ public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean,
resolvableTypeMap.put(key, resolvableType);
}
else {
addResolvableTypeInfo(key, resolvableType);
discoverOnlyKafkaStreamsResolvableTypes(key, resolvableType);
}
}
}
@@ -149,7 +157,7 @@ public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean,
this.methods.put(key, method);
}
else {
addResolvableTypeInfo(key, resolvableType);
discoverOnlyKafkaStreamsResolvableTypesAndMethods(key, resolvableType, method);
}
}
}
@@ -161,13 +169,42 @@ public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean,
}
private void addResolvableTypeInfo(String key, ResolvableType resolvableType) {
final String definition = streamFunctionProperties.getDefinition();
if (definition == null) {
throw new IllegalStateException("Multiple functions found, but function definition property is not set.");
}
else if (definition.contains(key)) {
if (kafkaStreamsOnlyResolvableTypes.size() == 1) {
resolvableTypeMap.put(key, resolvableType);
}
else {
final String definition = streamFunctionProperties.getDefinition();
if (definition == null) {
throw new IllegalStateException("Multiple functions found, but function definition property is not set.");
}
else if (definition.contains(key)) {
resolvableTypeMap.put(key, resolvableType);
}
}
}
private void discoverOnlyKafkaStreamsResolvableTypes(String key, ResolvableType resolvableType) {
kafkaStreamsOnlyResolvableTypes.put(key, resolvableType);
}
private void discoverOnlyKafkaStreamsResolvableTypesAndMethods(String key, ResolvableType resolvableType, Method method) {
kafkaStreamsOnlyResolvableTypes.put(key, resolvableType);
kafakStreamsOnlyMethods.put(key, method);
}
private void addResolvableTypeInfo(String key, Method method) {
if (kafakStreamsOnlyMethods.size() == 1) {
this.methods.put(key, method);
}
else {
final String definition = streamFunctionProperties.getDefinition();
if (definition == null) {
throw new IllegalStateException("Multiple functions found, but function definition property is not set.");
}
else if (definition.contains(key)) {
this.methods.put(key, method);
}
}
}
private boolean isKafkaStreamsTypeFound(Method method) {