@@ -43,17 +43,15 @@ import org.springframework.util.ClassUtils;
|
||||
*/
|
||||
public class FunctionDetectorCondition extends SpringBootCondition {
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Override
|
||||
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) {
|
||||
if (context != null && context.getBeanFactory() != null) {
|
||||
Map functionTypes = context.getBeanFactory().getBeansOfType(Function.class);
|
||||
functionTypes.putAll(context.getBeanFactory().getBeansOfType(Consumer.class));
|
||||
final Map<String, Object> kstreamFunctions = pruneFunctionBeansForKafkaStreams(functionTypes, context);
|
||||
|
||||
final Map<String, Function> functionTypes = context.getBeanFactory().getBeansOfType(Function.class);
|
||||
final Map<String, Consumer> consumerTypes = context.getBeanFactory().getBeansOfType(Consumer.class);
|
||||
|
||||
final Map<String, Function> prunedFunctionMap = pruneFunctionBeansForKafkaStreams(functionTypes, context);
|
||||
final Map<String, Consumer> prunedConsumerMap = pruneFunctionBeansForKafkaStreams(consumerTypes, context);
|
||||
|
||||
if (!prunedFunctionMap.isEmpty() || !prunedConsumerMap.isEmpty()) {
|
||||
if (!kstreamFunctions.isEmpty()) {
|
||||
return ConditionOutcome.match("Matched. Function/Consumer beans found");
|
||||
}
|
||||
else {
|
||||
|
||||
@@ -21,6 +21,7 @@ import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
@@ -49,11 +50,10 @@ class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean, BeanFac
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
|
||||
final Map<String, Function> functionTypes = this.beanFactory.getBeansOfType(Function.class);
|
||||
final Map<String, Consumer> consumerTypes = this.beanFactory.getBeansOfType(Consumer.class);
|
||||
String[] functionNames = this.beanFactory.getBeanNamesForType(Function.class);
|
||||
String[] consumerNames = this.beanFactory.getBeanNamesForType(Consumer.class);
|
||||
|
||||
functionTypes.keySet().forEach(this::extractResolvableTypes);
|
||||
consumerTypes.keySet().forEach(this::extractResolvableTypes);
|
||||
Stream.concat(Stream.of(functionNames), Stream.of(consumerNames)).forEach(this::extractResolvableTypes);
|
||||
}
|
||||
|
||||
private void extractResolvableTypes(String key) {
|
||||
|
||||
Reference in New Issue
Block a user