Kafka Streams functional support and autowiring

There are issues when a bean declared as a function in the Kafka Streams
application tries to autowire a bean through method parameter injection.
Addressing these concerns.

Resolves #726
This commit is contained in:
Soby Chacko
2019-08-20 18:40:01 -04:00
parent 245a43c1d8
commit 24e1fc9722
3 changed files with 23 additions and 7 deletions

View File

@@ -20,11 +20,14 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
@@ -48,6 +51,8 @@ import org.springframework.util.CollectionUtils;
*/
public class FunctionDetectorCondition extends SpringBootCondition {
private static final Log LOG = LogFactory.getLog(FunctionDetectorCondition.class);
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) {
@@ -86,15 +91,17 @@ public class FunctionDetectorCondition extends SpringBootCondition {
.getMetadata().getClassName(),
ClassUtils.getDefaultClassLoader());
try {
Method method = classObj.getMethod(key);
Method[] methods = classObj.getMethods();
Optional<Method> kafkaStreamMethod = Arrays.stream(methods).filter(m -> m.getName().equals(key)).findFirst();
Method method = kafkaStreamMethod.get();
ResolvableType resolvableType = ResolvableType.forMethodReturnType(method, classObj);
final Class<?> rawClass = resolvableType.getGeneric(0).getRawClass();
if (rawClass == KStream.class || rawClass == KTable.class || rawClass == GlobalKTable.class) {
prunedList.add(key);
}
}
catch (NoSuchMethodException e) {
//ignore
catch (Exception e) {
LOG.error("Function not found: " + key, e);
}
}
return prunedList;

View File

@@ -54,7 +54,7 @@ public class KafkaStreamsFunctionAutoConfiguration {
@Bean
@Conditional(FunctionDetectorCondition.class)
public BeanFactoryPostProcessor implicitFunctionKafkaStreamsBinder(KafkaStreamsFunctionBeanPostProcessor kafkaStreamsFunctionBeanPostProcessor) {
public static BeanFactoryPostProcessor implicitFunctionKafkaStreamsBinder(KafkaStreamsFunctionBeanPostProcessor kafkaStreamsFunctionBeanPostProcessor) {
return beanFactory -> {
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;

View File

@@ -17,7 +17,9 @@
package org.springframework.cloud.stream.binder.kafka.streams.function;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
@@ -25,6 +27,9 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
@@ -42,6 +47,8 @@ import org.springframework.util.ClassUtils;
*/
public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean, BeanFactoryAware {
private static final Log LOG = LogFactory.getLog(KafkaStreamsFunctionBeanPostProcessor.class);
private ConfigurableListableBeanFactory beanFactory;
private Map<String, ResolvableType> resolvableTypeMap = new TreeMap<>();
@@ -68,12 +75,14 @@ public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean,
.getMetadata().getClassName(),
ClassUtils.getDefaultClassLoader());
try {
Method method = classObj.getMethod(key);
Method[] methods = classObj.getMethods();
Optional<Method> kafkaStreamMethod = Arrays.stream(methods).filter(m -> m.getName().equals(key)).findFirst();
Method method = kafkaStreamMethod.get();
ResolvableType resolvableType = ResolvableType.forMethodReturnType(method, classObj);
resolvableTypeMap.put(key, resolvableType);
}
catch (NoSuchMethodException e) {
//ignore
catch (Exception e) {
LOG.error("Function not found: " + key, e);
}
}