Kafka Streams function detection improvements (#1033)
* Kafka Streams function detection improvements Allow Kafka Streams functions defined as Component beans to be candidates for establishing bindings. Currently, Kafka Streams functions need to be written as functional beans using @Bean. Adding this improvement so that if applications prefer to write the business logic using @Component, then it is possible to do so. Adding test cases to verify the behavior. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1030 * Kafka Streams functions and bean name overriding Whn Kafka Streams function bean names are overridden, there is an issue with scanning it properly for binding. Addressing this issue. * Adding docs for Component based model * Addressing PR review comments
This commit is contained in:
@@ -77,6 +77,22 @@ NOTE: If the destination property is not set on the binding, a topic is created
|
||||
|
||||
Once built as a uber-jar (e.g., `kstream-consumer-app.jar`), you can run the above example like the following.
|
||||
|
||||
If the applications choose to define the functional beans using Spring's `Component` annotation, the binder also suppports that model.
|
||||
The above functional bean could be rewritten as below.
|
||||
|
||||
```
|
||||
@Component(name = "process")
|
||||
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {
|
||||
|
||||
@Override
|
||||
public void accept(KStream<Object, String> input) {
|
||||
input.foreach((key, value) -> {
|
||||
System.out.println("Key: " + key + " Value: " + value);
|
||||
});
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
[source]
|
||||
----
|
||||
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
* Copyright 2019-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
@@ -107,9 +108,21 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
|
||||
}
|
||||
|
||||
private Map<String, ResolvableType> buildTypeMap(ResolvableType resolvableType,
|
||||
KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory) {
|
||||
KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory,
|
||||
Method method, String functionName) {
|
||||
Map<String, ResolvableType> resolvableTypeMap = new LinkedHashMap<>();
|
||||
if (resolvableType != null && resolvableType.getRawClass() != null) {
|
||||
if (method != null) { // Component functional bean.
|
||||
final ResolvableType firstMethodParameter = ResolvableType.forMethodParameter(method, 0);
|
||||
ResolvableType currentOutputGeneric = ResolvableType.forMethodReturnType(method);
|
||||
|
||||
final Set<String> inputs = new LinkedHashSet<>(kafkaStreamsBindableProxyFactory.getInputs());
|
||||
final Iterator<String> iterator = inputs.iterator();
|
||||
populateResolvableTypeMap(firstMethodParameter, resolvableTypeMap, iterator, method, functionName);
|
||||
|
||||
final Class<?> outputRawclass = currentOutputGeneric.getRawClass();
|
||||
traverseReturnTypeForComponentBeans(resolvableTypeMap, currentOutputGeneric, inputs, iterator, outputRawclass);
|
||||
}
|
||||
else if (resolvableType != null && resolvableType.getRawClass() != null) {
|
||||
int inputCount = 1;
|
||||
|
||||
ResolvableType currentOutputGeneric;
|
||||
@@ -129,7 +142,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
|
||||
|
||||
final Iterator<String> iterator = inputs.iterator();
|
||||
|
||||
popuateResolvableTypeMap(resolvableType, resolvableTypeMap, iterator);
|
||||
populateResolvableTypeMap(resolvableType, resolvableTypeMap, iterator);
|
||||
|
||||
ResolvableType iterableResType = resolvableType;
|
||||
int i = resolvableType.getRawClass().isAssignableFrom(BiFunction.class) ||
|
||||
@@ -143,7 +156,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
|
||||
iterableResType = iterableResType.getGeneric(1);
|
||||
if (iterableResType.getRawClass() != null &&
|
||||
functionOrConsumerFound(iterableResType)) {
|
||||
popuateResolvableTypeMap(iterableResType, resolvableTypeMap, iterator);
|
||||
populateResolvableTypeMap(iterableResType, resolvableTypeMap, iterator);
|
||||
}
|
||||
i++;
|
||||
}
|
||||
@@ -154,12 +167,32 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
|
||||
return resolvableTypeMap;
|
||||
}
|
||||
|
||||
private void traverseReturnTypeForComponentBeans(Map<String, ResolvableType> resolvableTypeMap, ResolvableType currentOutputGeneric,
|
||||
Set<String> inputs, Iterator<String> iterator, Class<?> outputRawclass) {
|
||||
if (outputRawclass != null && !outputRawclass.equals(Void.TYPE)) {
|
||||
ResolvableType iterableResType = currentOutputGeneric;
|
||||
int i = 1;
|
||||
// Traverse through the return signature.
|
||||
while (i < inputs.size() && iterator.hasNext()) {
|
||||
if (iterableResType.getRawClass() != null &&
|
||||
functionOrConsumerFound(iterableResType)) {
|
||||
populateResolvableTypeMap(iterableResType, resolvableTypeMap, iterator);
|
||||
}
|
||||
iterableResType = iterableResType.getGeneric(1);
|
||||
i++;
|
||||
}
|
||||
if (iterableResType.getRawClass() != null && KStream.class.isAssignableFrom(iterableResType.getRawClass())) {
|
||||
resolvableTypeMap.put(OUTBOUND, iterableResType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean functionOrConsumerFound(ResolvableType iterableResType) {
|
||||
return iterableResType.getRawClass().equals(Function.class) ||
|
||||
iterableResType.getRawClass().equals(Consumer.class);
|
||||
}
|
||||
|
||||
private void popuateResolvableTypeMap(ResolvableType resolvableType, Map<String, ResolvableType> resolvableTypeMap,
|
||||
private void populateResolvableTypeMap(ResolvableType resolvableType, Map<String, ResolvableType> resolvableTypeMap,
|
||||
Iterator<String> iterator) {
|
||||
final String next = iterator.next();
|
||||
resolvableTypeMap.put(next, resolvableType.getGeneric(0));
|
||||
@@ -171,6 +204,18 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
|
||||
}
|
||||
}
|
||||
|
||||
private void populateResolvableTypeMap(ResolvableType resolvableType, Map<String, ResolvableType> resolvableTypeMap,
|
||||
Iterator<String> iterator, Method method, String functionName) {
|
||||
final String next = iterator.next();
|
||||
resolvableTypeMap.put(next, resolvableType);
|
||||
if (method != null) {
|
||||
final Object bean = beanFactory.getBean(functionName);
|
||||
if (BiFunction.class.isAssignableFrom(bean.getClass()) || BiConsumer.class.isAssignableFrom(bean.getClass())) {
|
||||
resolvableTypeMap.put(iterator.next(), ResolvableType.forMethodParameter(method, 1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method must be kept stateless. In the case of multiple function beans in an application,
|
||||
* isolated {@link KafkaStreamsBindableProxyFactory} instances are passed in separately for those functions. If the
|
||||
@@ -183,10 +228,11 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
|
||||
*/
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public void setupFunctionInvokerForKafkaStreams(ResolvableType resolvableType, String functionName,
|
||||
KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory) {
|
||||
final Map<String, ResolvableType> stringResolvableTypeMap = buildTypeMap(resolvableType, kafkaStreamsBindableProxyFactory);
|
||||
ResolvableType outboundResolvableType = stringResolvableTypeMap.remove(OUTBOUND);
|
||||
Object[] adaptedInboundArguments = adaptAndRetrieveInboundArguments(stringResolvableTypeMap, functionName);
|
||||
KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory, Method method) {
|
||||
final Map<String, ResolvableType> resolvableTypes = buildTypeMap(resolvableType,
|
||||
kafkaStreamsBindableProxyFactory, method, functionName);
|
||||
ResolvableType outboundResolvableType = resolvableTypes.remove(OUTBOUND);
|
||||
Object[] adaptedInboundArguments = adaptAndRetrieveInboundArguments(resolvableTypes, functionName);
|
||||
try {
|
||||
if (resolvableType.getRawClass() != null && resolvableType.getRawClass().equals(Consumer.class)) {
|
||||
Consumer<Object> consumer = (Consumer) this.beanFactory.getBean(functionName);
|
||||
@@ -196,6 +242,49 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
|
||||
BiConsumer<Object, Object> biConsumer = (BiConsumer) this.beanFactory.getBean(functionName);
|
||||
biConsumer.accept(adaptedInboundArguments[0], adaptedInboundArguments[1]);
|
||||
}
|
||||
else if (method != null) { // Handling component functional beans
|
||||
final Object bean = beanFactory.getBean(functionName);
|
||||
if (Consumer.class.isAssignableFrom(bean.getClass())) {
|
||||
((Consumer) bean).accept(adaptedInboundArguments[0]);
|
||||
}
|
||||
else if (BiConsumer.class.isAssignableFrom(bean.getClass())) {
|
||||
((BiConsumer) bean).accept(adaptedInboundArguments[0], adaptedInboundArguments[1]);
|
||||
}
|
||||
else if (Function.class.isAssignableFrom(bean.getClass()) || BiFunction.class.isAssignableFrom(bean.getClass())) {
|
||||
Object result;
|
||||
if (BiFunction.class.isAssignableFrom(bean.getClass())) {
|
||||
result = ((BiFunction) bean).apply(adaptedInboundArguments[0], adaptedInboundArguments[1]);
|
||||
}
|
||||
else {
|
||||
result = ((Function) bean).apply(adaptedInboundArguments[0]);
|
||||
}
|
||||
int i = 1;
|
||||
while (result instanceof Function || result instanceof Consumer) {
|
||||
if (result instanceof Function) {
|
||||
result = ((Function) result).apply(adaptedInboundArguments[i]);
|
||||
}
|
||||
else {
|
||||
((Consumer) result).accept(adaptedInboundArguments[i]);
|
||||
result = null;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
if (result != null) {
|
||||
final Set<String> outputs = new TreeSet<>(kafkaStreamsBindableProxyFactory.getOutputs());
|
||||
final Iterator<String> outboundDefinitionIterator = outputs.iterator();
|
||||
if (result.getClass().isArray()) {
|
||||
handleKStreamArrayOutbound(resolvableType, functionName, kafkaStreamsBindableProxyFactory, outboundResolvableType, (Object[]) result);
|
||||
}
|
||||
else {
|
||||
if (outboundDefinitionIterator.hasNext()) {
|
||||
Object targetBean = handleSingleKStreamOutbound((KStream) result, outboundDefinitionIterator);
|
||||
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(targetBean,
|
||||
outboundResolvableType);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
Object result;
|
||||
if (resolvableType.getRawClass() != null && resolvableType.getRawClass().equals(BiFunction.class)) {
|
||||
@@ -222,44 +311,13 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
|
||||
final Iterator<String> outboundDefinitionIterator = outputs.iterator();
|
||||
|
||||
if (result.getClass().isArray()) {
|
||||
// Binding target as the output bindings were deferred in the KafkaStreamsBindableProxyFactory
|
||||
// due to the fact that it didn't know the returned array size. At this point in the execution,
|
||||
// we know exactly the number of outbound components (from the array length), so do the binding.
|
||||
final int length = ((Object[]) result).length;
|
||||
|
||||
List<String> outputBindings = getOutputBindings(functionName, length);
|
||||
Iterator<String> iterator = outputBindings.iterator();
|
||||
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
|
||||
Object[] outboundKStreams = (Object[]) result;
|
||||
|
||||
for (int ij = 0; ij < length; ij++) {
|
||||
|
||||
String next = iterator.next();
|
||||
kafkaStreamsBindableProxyFactory.addOutputBinding(next, KStream.class);
|
||||
RootBeanDefinition rootBeanDefinition1 = new RootBeanDefinition();
|
||||
rootBeanDefinition1.setInstanceSupplier(() -> kafkaStreamsBindableProxyFactory.getOutputHolders().get(next).getBoundTarget());
|
||||
registry.registerBeanDefinition(next, rootBeanDefinition1);
|
||||
|
||||
Object targetBean = this.applicationContext.getBean(next);
|
||||
|
||||
KStreamBoundElementFactory.KStreamWrapper
|
||||
boundElement = (KStreamBoundElementFactory.KStreamWrapper) targetBean;
|
||||
boundElement.wrap((KStream) outboundKStreams[ij]);
|
||||
|
||||
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(
|
||||
targetBean, outboundResolvableType != null ? outboundResolvableType : resolvableType.getGeneric(1));
|
||||
}
|
||||
handleKStreamArrayOutbound(resolvableType, functionName, kafkaStreamsBindableProxyFactory, outboundResolvableType, (Object[]) result);
|
||||
}
|
||||
else {
|
||||
if (outboundDefinitionIterator.hasNext()) {
|
||||
final String next = outboundDefinitionIterator.next();
|
||||
Object targetBean = this.applicationContext.getBean(next);
|
||||
KStreamBoundElementFactory.KStreamWrapper
|
||||
boundElement = (KStreamBoundElementFactory.KStreamWrapper) targetBean;
|
||||
boundElement.wrap((KStream) result);
|
||||
|
||||
Object targetBean = handleSingleKStreamOutbound((KStream) result, outboundDefinitionIterator);
|
||||
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(
|
||||
targetBean, outboundResolvableType != null ? outboundResolvableType : resolvableType.getGeneric(1));
|
||||
targetBean, outboundResolvableType != null ? outboundResolvableType : resolvableType.getGeneric(1));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -270,6 +328,46 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
|
||||
}
|
||||
}
|
||||
|
||||
private Object handleSingleKStreamOutbound(KStream result, Iterator<String> outboundDefinitionIterator) {
|
||||
final String next = outboundDefinitionIterator.next();
|
||||
Object targetBean = this.applicationContext.getBean(next);
|
||||
KStreamBoundElementFactory.KStreamWrapper
|
||||
boundElement = (KStreamBoundElementFactory.KStreamWrapper) targetBean;
|
||||
boundElement.wrap(result);
|
||||
return targetBean;
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
private void handleKStreamArrayOutbound(ResolvableType resolvableType, String functionName,
|
||||
KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory,
|
||||
ResolvableType outboundResolvableType, Object[] result) {
|
||||
// Binding target as the output bindings were deferred in the KafkaStreamsBindableProxyFactory
|
||||
// due to the fact that it didn't know the returned array size. At this point in the execution,
|
||||
// we know exactly the number of outbound components (from the array length), so do the binding.
|
||||
final int length = result.length;
|
||||
|
||||
List<String> outputBindings = getOutputBindings(functionName, length);
|
||||
Iterator<String> iterator = outputBindings.iterator();
|
||||
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
|
||||
|
||||
for (Object o : result) {
|
||||
String next = iterator.next();
|
||||
kafkaStreamsBindableProxyFactory.addOutputBinding(next, KStream.class);
|
||||
RootBeanDefinition rootBeanDefinition1 = new RootBeanDefinition();
|
||||
rootBeanDefinition1.setInstanceSupplier(() -> kafkaStreamsBindableProxyFactory.getOutputHolders().get(next).getBoundTarget());
|
||||
registry.registerBeanDefinition(next, rootBeanDefinition1);
|
||||
|
||||
Object targetBean = this.applicationContext.getBean(next);
|
||||
|
||||
KStreamBoundElementFactory.KStreamWrapper
|
||||
boundElement = (KStreamBoundElementFactory.KStreamWrapper) targetBean;
|
||||
boundElement.wrap((KStream) o);
|
||||
|
||||
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(
|
||||
targetBean, outboundResolvableType != null ? outboundResolvableType : resolvableType.getGeneric(1));
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> getOutputBindings(String functionName, int outputs) {
|
||||
List<String> outputBindings = this.streamFunctionProperties.getOutputBindings(functionName);
|
||||
List<String> outputBindingNames = new ArrayList<>();
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
* Copyright 2019-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.kstream.KTable;
|
||||
|
||||
import org.springframework.beans.factory.BeanFactoryUtils;
|
||||
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
|
||||
import org.springframework.beans.factory.config.BeanDefinition;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
|
||||
import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
|
||||
import org.springframework.context.annotation.ConditionContext;
|
||||
@@ -81,18 +82,26 @@ public class FunctionDetectorCondition extends SpringBootCondition {
|
||||
return ConditionOutcome.noMatch("No match. No Function/BiFunction/Consumer beans found");
|
||||
}
|
||||
|
||||
private static List<String> pruneFunctionBeansForKafkaStreams(List<String> strings,
|
||||
private static List<String> pruneFunctionBeansForKafkaStreams(List<String> functionComponents,
|
||||
ConditionContext context) {
|
||||
final List<String> prunedList = new ArrayList<>();
|
||||
|
||||
for (String key : strings) {
|
||||
for (String key : functionComponents) {
|
||||
final Class<?> classObj = ClassUtils.resolveClassName(((AnnotatedBeanDefinition)
|
||||
context.getBeanFactory().getBeanDefinition(key))
|
||||
.getMetadata().getClassName(),
|
||||
ClassUtils.getDefaultClassLoader());
|
||||
try {
|
||||
|
||||
Method[] methods = classObj.getMethods();
|
||||
Optional<Method> kafkaStreamMethod = Arrays.stream(methods).filter(m -> m.getName().equals(key)).findFirst();
|
||||
// check if the bean name is overridden.
|
||||
if (!kafkaStreamMethod.isPresent()) {
|
||||
final BeanDefinition beanDefinition = context.getBeanFactory().getBeanDefinition(key);
|
||||
final String factoryMethodName = beanDefinition.getFactoryMethodName();
|
||||
kafkaStreamMethod = Arrays.stream(methods).filter(m -> m.getName().equals(factoryMethodName)).findFirst();
|
||||
}
|
||||
|
||||
if (kafkaStreamMethod.isPresent()) {
|
||||
Method method = kafkaStreamMethod.get();
|
||||
ResolvableType resolvableType = ResolvableType.forMethodReturnType(method, classObj);
|
||||
@@ -101,6 +110,20 @@ public class FunctionDetectorCondition extends SpringBootCondition {
|
||||
prunedList.add(key);
|
||||
}
|
||||
}
|
||||
else {
|
||||
//check if it is a @Component bean.
|
||||
Optional<Method> componentBeanMethod = Arrays.stream(methods).filter(
|
||||
m -> (m.getName().equals("apply") || m.getName().equals("accept"))
|
||||
&& isKafkaStreamsTypeFound(m)).findFirst();
|
||||
if (componentBeanMethod.isPresent()) {
|
||||
Method method = componentBeanMethod.get();
|
||||
final ResolvableType resolvableType1 = ResolvableType.forMethodParameter(method, 0);
|
||||
final Class<?> rawClass = resolvableType1.getRawClass();
|
||||
if (rawClass == KStream.class || rawClass == KTable.class || rawClass == GlobalKTable.class) {
|
||||
prunedList.add(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.error("Function not found: " + key, e);
|
||||
@@ -108,4 +131,10 @@ public class FunctionDetectorCondition extends SpringBootCondition {
|
||||
}
|
||||
return prunedList;
|
||||
}
|
||||
|
||||
private static boolean isKafkaStreamsTypeFound(Method method) {
|
||||
return KStream.class.isAssignableFrom(method.getParameters()[0].getType()) ||
|
||||
KTable.class.isAssignableFrom(method.getParameters()[0].getType()) ||
|
||||
GlobalKTable.class.isAssignableFrom(method.getParameters()[0].getType());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
* Copyright 2019-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.function;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
@@ -72,15 +73,17 @@ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFacto
|
||||
|
||||
private final ResolvableType type;
|
||||
|
||||
private final Method method;
|
||||
|
||||
private final String functionName;
|
||||
|
||||
private BeanFactory beanFactory;
|
||||
|
||||
|
||||
public KafkaStreamsBindableProxyFactory(ResolvableType type, String functionName) {
|
||||
public KafkaStreamsBindableProxyFactory(ResolvableType type, String functionName, Method method) {
|
||||
super(type.getType().getClass());
|
||||
this.type = type;
|
||||
this.functionName = functionName;
|
||||
this.method = method;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -89,12 +92,25 @@ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFacto
|
||||
"'bindingTargetFactories' cannot be empty");
|
||||
|
||||
int resolvableTypeDepthCounter = 0;
|
||||
ResolvableType argument = this.type.getGeneric(resolvableTypeDepthCounter++);
|
||||
boolean isKafkaStreamsType = this.type.getRawClass().isAssignableFrom(KStream.class) ||
|
||||
this.type.getRawClass().isAssignableFrom(KTable.class) ||
|
||||
this.type.getRawClass().isAssignableFrom(GlobalKTable.class);
|
||||
ResolvableType argument = isKafkaStreamsType ? this.type : this.type.getGeneric(resolvableTypeDepthCounter++);
|
||||
List<String> inputBindings = buildInputBindings();
|
||||
Iterator<String> iterator = inputBindings.iterator();
|
||||
String next = iterator.next();
|
||||
bindInput(argument, next);
|
||||
|
||||
// Check if its a component style bean.
|
||||
if (method != null) {
|
||||
final Object bean = beanFactory.getBean(functionName);
|
||||
if (BiFunction.class.isAssignableFrom(bean.getClass()) || BiConsumer.class.isAssignableFrom(bean.getClass())) {
|
||||
argument = ResolvableType.forMethodParameter(method, 1);
|
||||
next = iterator.next();
|
||||
bindInput(argument, next);
|
||||
}
|
||||
}
|
||||
// Normal functional bean
|
||||
if (this.type.getRawClass() != null &&
|
||||
(this.type.getRawClass().isAssignableFrom(BiFunction.class) ||
|
||||
this.type.getRawClass().isAssignableFrom(BiConsumer.class))) {
|
||||
@@ -104,6 +120,9 @@ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFacto
|
||||
}
|
||||
ResolvableType outboundArgument = this.type.getGeneric(resolvableTypeDepthCounter);
|
||||
|
||||
if (method != null) {
|
||||
outboundArgument = ResolvableType.forMethodReturnType(method);
|
||||
}
|
||||
while (isAnotherFunctionOrConsumerFound(outboundArgument)) {
|
||||
//The function is a curried function. We should introspect the partial function chain hierarchy.
|
||||
argument = outboundArgument.getGeneric(0);
|
||||
@@ -112,8 +131,7 @@ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFacto
|
||||
outboundArgument = outboundArgument.getGeneric(1);
|
||||
}
|
||||
|
||||
//Introspect output for binding.
|
||||
if (outboundArgument != null && outboundArgument.getRawClass() != null && (!outboundArgument.isArray() &&
|
||||
if (outboundArgument.getRawClass() != null && (!outboundArgument.isArray() &&
|
||||
outboundArgument.getRawClass().isAssignableFrom(KStream.class))) {
|
||||
// if the type is array, we need to do a late binding as we don't know the number of
|
||||
// output bindings at this point in the flow.
|
||||
@@ -165,12 +183,31 @@ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFacto
|
||||
int numberOfInputs = this.type.getRawClass() != null &&
|
||||
(this.type.getRawClass().isAssignableFrom(BiFunction.class) ||
|
||||
this.type.getRawClass().isAssignableFrom(BiConsumer.class)) ? 2 : getNumberOfInputs();
|
||||
|
||||
// For @Component style beans.
|
||||
if (method != null) {
|
||||
final ResolvableType returnType = ResolvableType.forMethodReturnType(method);
|
||||
Object bean = beanFactory.containsBean(functionName) ? beanFactory.getBean(functionName) : null;
|
||||
|
||||
if (bean != null && (BiFunction.class.isAssignableFrom(bean.getClass()) || BiConsumer.class.isAssignableFrom(bean.getClass()))) {
|
||||
numberOfInputs = 2;
|
||||
}
|
||||
else if (returnType.getRawClass().isAssignableFrom(Function.class) || returnType.getRawClass().isAssignableFrom(Consumer.class)) {
|
||||
numberOfInputs = 1;
|
||||
ResolvableType arg1 = returnType;
|
||||
|
||||
while (isAnotherFunctionOrConsumerFound(arg1)) {
|
||||
arg1 = arg1.getGeneric(1);
|
||||
numberOfInputs++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
while (i < numberOfInputs) {
|
||||
inputs.add(String.format("%s-%s-%d", this.functionName, FunctionConstants.DEFAULT_INPUT_SUFFIX, i++));
|
||||
}
|
||||
return inputs;
|
||||
|
||||
}
|
||||
|
||||
private int getNumberOfInputs() {
|
||||
@@ -182,7 +219,6 @@ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFacto
|
||||
numberOfInputs++;
|
||||
}
|
||||
return numberOfInputs;
|
||||
|
||||
}
|
||||
|
||||
private void bindInput(ResolvableType arg0, String inputName) {
|
||||
@@ -191,13 +227,10 @@ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFacto
|
||||
new BoundTargetHolder(getBindingTargetFactory(arg0.getRawClass())
|
||||
.createInput(inputName), true));
|
||||
}
|
||||
|
||||
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
|
||||
|
||||
RootBeanDefinition rootBeanDefinition = new RootBeanDefinition();
|
||||
rootBeanDefinition.setInstanceSupplier(() -> inputHolders.get(inputName).getBoundTarget());
|
||||
registry.registerBeanDefinition(inputName, rootBeanDefinition);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
* Copyright 2019-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -38,7 +38,7 @@ public class KafkaStreamsFunctionAutoConfiguration {
|
||||
KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor,
|
||||
KafkaStreamsBindableProxyFactory[] kafkaStreamsBindableProxyFactories) {
|
||||
return new KafkaStreamsFunctionProcessorInvoker(kafkaStreamsFunctionBeanPostProcessor.getResolvableTypes(),
|
||||
kafkaStreamsFunctionProcessor, kafkaStreamsBindableProxyFactories);
|
||||
kafkaStreamsFunctionProcessor, kafkaStreamsBindableProxyFactories, kafkaStreamsFunctionBeanPostProcessor.getMethods());
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
* Copyright 2019-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -40,6 +40,7 @@ import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.beans.factory.BeanFactoryAware;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
|
||||
import org.springframework.beans.factory.config.BeanDefinition;
|
||||
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
|
||||
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
|
||||
import org.springframework.beans.factory.support.RootBeanDefinition;
|
||||
@@ -48,10 +49,8 @@ import org.springframework.core.ResolvableType;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @since 2.2.0
|
||||
*
|
||||
*/
|
||||
public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean, BeanFactoryAware {
|
||||
|
||||
@@ -62,6 +61,7 @@ public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean,
|
||||
private ConfigurableListableBeanFactory beanFactory;
|
||||
private boolean onlySingleFunction;
|
||||
private Map<String, ResolvableType> resolvableTypeMap = new TreeMap<>();
|
||||
private Map<String, Method> methods = new TreeMap<>();
|
||||
|
||||
private final StreamFunctionProperties streamFunctionProperties;
|
||||
|
||||
@@ -73,6 +73,10 @@ public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean,
|
||||
return this.resolvableTypeMap;
|
||||
}
|
||||
|
||||
public Map<String, Method> getMethods() {
|
||||
return methods;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
String[] functionNames = this.beanFactory.getBeanNamesForType(Function.class);
|
||||
@@ -98,6 +102,8 @@ public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean,
|
||||
.addGenericArgumentValue(getResolvableTypes().get(s));
|
||||
rootBeanDefinition.getConstructorArgumentValues()
|
||||
.addGenericArgumentValue(s);
|
||||
rootBeanDefinition.getConstructorArgumentValues()
|
||||
.addGenericArgumentValue(getMethods().get(s));
|
||||
registry.registerBeanDefinition("kafkaStreamsBindableProxyFactory-" + s, rootBeanDefinition);
|
||||
}
|
||||
}
|
||||
@@ -110,6 +116,12 @@ public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean,
|
||||
try {
|
||||
Method[] methods = classObj.getMethods();
|
||||
Optional<Method> kafkaStreamMethod = Arrays.stream(methods).filter(m -> m.getName().equals(key)).findFirst();
|
||||
if (!kafkaStreamMethod.isPresent()) {
|
||||
final BeanDefinition beanDefinition = this.beanFactory.getBeanDefinition(key);
|
||||
final String factoryMethodName = beanDefinition.getFactoryMethodName();
|
||||
kafkaStreamMethod = Arrays.stream(methods).filter(m -> m.getName().equals(factoryMethodName)).findFirst();
|
||||
}
|
||||
|
||||
if (kafkaStreamMethod.isPresent()) {
|
||||
Method method = kafkaStreamMethod.get();
|
||||
ResolvableType resolvableType = ResolvableType.forMethodReturnType(method, classObj);
|
||||
@@ -119,12 +131,25 @@ public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean,
|
||||
resolvableTypeMap.put(key, resolvableType);
|
||||
}
|
||||
else {
|
||||
final String definition = streamFunctionProperties.getDefinition();
|
||||
if (definition == null) {
|
||||
throw new IllegalStateException("Multiple functions found, but function definition property is not set.");
|
||||
}
|
||||
else if (definition.contains(key)) {
|
||||
addResolvableTypeInfo(key, resolvableType);
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
Optional<Method> componentBeanMethods = Arrays.stream(methods)
|
||||
.filter(m -> m.getName().equals("apply") && isKafkaStreamsTypeFound(m) ||
|
||||
m.getName().equals("accept") && isKafkaStreamsTypeFound(m)).findFirst();
|
||||
if (componentBeanMethods.isPresent()) {
|
||||
Method method = componentBeanMethods.get();
|
||||
final ResolvableType resolvableType = ResolvableType.forMethodParameter(method, 0);
|
||||
final Class<?> rawClass = resolvableType.getRawClass();
|
||||
if (rawClass == KStream.class || rawClass == KTable.class || rawClass == GlobalKTable.class) {
|
||||
if (onlySingleFunction) {
|
||||
resolvableTypeMap.put(key, resolvableType);
|
||||
this.methods.put(key, method);
|
||||
}
|
||||
else {
|
||||
addResolvableTypeInfo(key, resolvableType);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -135,6 +160,22 @@ public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean,
|
||||
}
|
||||
}
|
||||
|
||||
private void addResolvableTypeInfo(String key, ResolvableType resolvableType) {
|
||||
final String definition = streamFunctionProperties.getDefinition();
|
||||
if (definition == null) {
|
||||
throw new IllegalStateException("Multiple functions found, but function definition property is not set.");
|
||||
}
|
||||
else if (definition.contains(key)) {
|
||||
resolvableTypeMap.put(key, resolvableType);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isKafkaStreamsTypeFound(Method method) {
|
||||
return KStream.class.isAssignableFrom(method.getParameters()[0].getType()) ||
|
||||
KTable.class.isAssignableFrom(method.getParameters()[0].getType()) ||
|
||||
GlobalKTable.class.isAssignableFrom(method.getParameters()[0].getType());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
|
||||
this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
* Copyright 2019-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.function;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@@ -35,13 +36,16 @@ public class KafkaStreamsFunctionProcessorInvoker {
|
||||
private final KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor;
|
||||
private final Map<String, ResolvableType> resolvableTypeMap;
|
||||
private final KafkaStreamsBindableProxyFactory[] kafkaStreamsBindableProxyFactories;
|
||||
private final Map<String, Method> methods;
|
||||
|
||||
public KafkaStreamsFunctionProcessorInvoker(Map<String, ResolvableType> resolvableTypeMap,
|
||||
KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor,
|
||||
KafkaStreamsBindableProxyFactory[] kafkaStreamsBindableProxyFactories) {
|
||||
KafkaStreamsBindableProxyFactory[] kafkaStreamsBindableProxyFactories,
|
||||
Map<String, Method> methods) {
|
||||
this.kafkaStreamsFunctionProcessor = kafkaStreamsFunctionProcessor;
|
||||
this.resolvableTypeMap = resolvableTypeMap;
|
||||
this.kafkaStreamsBindableProxyFactories = kafkaStreamsBindableProxyFactories;
|
||||
this.methods = methods;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
@@ -49,7 +53,7 @@ public class KafkaStreamsFunctionProcessorInvoker {
|
||||
resolvableTypeMap.forEach((key, value) -> {
|
||||
Optional<KafkaStreamsBindableProxyFactory> proxyFactory =
|
||||
Arrays.stream(kafkaStreamsBindableProxyFactories).filter(p -> p.getFunctionName().equals(key)).findFirst();
|
||||
this.kafkaStreamsFunctionProcessor.setupFunctionInvokerForKafkaStreams(value, key, proxyFactory.get());
|
||||
this.kafkaStreamsFunctionProcessor.setupFunctionInvokerForKafkaStreams(value, key, proxyFactory.get(), methods.get(key));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,347 @@
|
||||
/*
|
||||
* Copyright 2021-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.function;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.test.EmbeddedKafkaBroker;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class KafkaStreamsComponentBeansTests {
|
||||
|
||||
@ClassRule
|
||||
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
|
||||
"testFunctionComponent-out", "testBiFunctionComponent-out", "testCurriedFunctionWithFunctionTerminal-out");
|
||||
|
||||
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
|
||||
|
||||
private static Consumer<String, String> consumer1;
|
||||
private static Consumer<String, String> consumer2;
|
||||
private static Consumer<String, String> consumer3;
|
||||
|
||||
private final static CountDownLatch LATCH_1 = new CountDownLatch(1);
|
||||
private final static CountDownLatch LATCH_2 = new CountDownLatch(2);
|
||||
private final static CountDownLatch LATCH_3 = new CountDownLatch(3);
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() {
|
||||
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "false",
|
||||
embeddedKafka);
|
||||
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
|
||||
consumer1 = cf.createConsumer();
|
||||
embeddedKafka.consumeFromEmbeddedTopics(consumer1, "testFunctionComponent-out");
|
||||
|
||||
Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("group-x", "false",
|
||||
embeddedKafka);
|
||||
consumerProps1.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
consumerProps1.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
DefaultKafkaConsumerFactory<String, String> cf1 = new DefaultKafkaConsumerFactory<>(consumerProps1);
|
||||
consumer2 = cf1.createConsumer();
|
||||
embeddedKafka.consumeFromEmbeddedTopics(consumer2, "testBiFunctionComponent-out");
|
||||
|
||||
Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("group-y", "false",
|
||||
embeddedKafka);
|
||||
consumerProps2.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
consumerProps2.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
DefaultKafkaConsumerFactory<String, String> cf2 = new DefaultKafkaConsumerFactory<>(consumerProps2);
|
||||
consumer3 = cf2.createConsumer();
|
||||
embeddedKafka.consumeFromEmbeddedTopics(consumer3, "testCurriedFunctionWithFunctionTerminal-out");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
consumer1.close();
|
||||
consumer2.close();
|
||||
consumer3.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFunctionComponent() {
|
||||
SpringApplication app = new SpringApplication(FunctionAsComponent.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
try (ConfigurableApplicationContext ignored = app.run(
|
||||
"--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.foo-in-0.destination=testFunctionComponent-in",
|
||||
"--spring.cloud.stream.bindings.foo-out-0.destination=testFunctionComponent-out",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
try {
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
|
||||
template.setDefaultTopic("testFunctionComponent-in");
|
||||
template.sendDefault("foobar");
|
||||
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer1, "testFunctionComponent-out");
|
||||
assertThat(cr.value().contains("foobarfoobar")).isTrue();
|
||||
}
|
||||
finally {
|
||||
pf.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerComponent() throws Exception {
|
||||
SpringApplication app = new SpringApplication(ConsumerAsComponent.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
try (ConfigurableApplicationContext context = app.run(
|
||||
"--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.bar-in-0.destination=testConsumerComponent-in",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
try {
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
|
||||
template.setDefaultTopic("testConsumerComponent-in");
|
||||
template.sendDefault("foobar");
|
||||
Assert.isTrue(LATCH_1.await(10, TimeUnit.SECONDS), "bar");
|
||||
}
|
||||
finally {
|
||||
pf.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBiFunctionComponent() {
|
||||
SpringApplication app = new SpringApplication(BiFunctionAsComponent.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
try (ConfigurableApplicationContext ignored = app.run(
|
||||
"--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.bazz-in-0.destination=testBiFunctionComponent-in-0",
|
||||
"--spring.cloud.stream.bindings.bazz-in-1.destination=testBiFunctionComponent-in-1",
|
||||
"--spring.cloud.stream.bindings.bazz-out-0.destination=testBiFunctionComponent-out",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
try {
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
|
||||
template.setDefaultTopic("testBiFunctionComponent-in-0");
|
||||
template.sendDefault("foobar");
|
||||
template.setDefaultTopic("testBiFunctionComponent-in-1");
|
||||
template.sendDefault("foobar");
|
||||
final ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer2, 10_000, 2);
|
||||
assertThat(records.count()).isEqualTo(2);
|
||||
records.forEach(stringStringConsumerRecord -> assertThat(stringStringConsumerRecord.value().contains("foobar")).isTrue());
|
||||
}
|
||||
finally {
|
||||
pf.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBiConsumerComponent() throws Exception {
|
||||
SpringApplication app = new SpringApplication(BiConsumerAsComponent.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
try (ConfigurableApplicationContext context = app.run(
|
||||
"--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.buzz-in-0.destination=testBiConsumerComponent-in-0",
|
||||
"--spring.cloud.stream.bindings.buzz-in-1.destination=testBiConsumerComponent-in-1",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
try {
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
|
||||
template.setDefaultTopic("testBiConsumerComponent-in-0");
|
||||
template.sendDefault("foobar");
|
||||
template.setDefaultTopic("testBiConsumerComponent-in-1");
|
||||
template.sendDefault("foobar");
|
||||
Assert.isTrue(LATCH_2.await(10, TimeUnit.SECONDS), "bar");
|
||||
}
|
||||
finally {
|
||||
pf.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCurriedFunctionWithConsumerTerminal() throws Exception {
|
||||
SpringApplication app = new SpringApplication(CurriedFunctionWithConsumerTerminal.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
try (ConfigurableApplicationContext context = app.run(
|
||||
"--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.curriedConsumer-in-0.destination=testCurriedFunctionWithConsumerTerminal-in-0",
|
||||
"--spring.cloud.stream.bindings.curriedConsumer-in-1.destination=testCurriedFunctionWithConsumerTerminal-in-1",
|
||||
"--spring.cloud.stream.bindings.curriedConsumer-in-2.destination=testCurriedFunctionWithConsumerTerminal-in-2",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
try {
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
|
||||
template.setDefaultTopic("testCurriedFunctionWithConsumerTerminal-in-0");
|
||||
template.sendDefault("foobar");
|
||||
template.setDefaultTopic("testCurriedFunctionWithConsumerTerminal-in-1");
|
||||
template.sendDefault("foobar");
|
||||
template.setDefaultTopic("testCurriedFunctionWithConsumerTerminal-in-2");
|
||||
template.sendDefault("foobar");
|
||||
Assert.isTrue(LATCH_3.await(10, TimeUnit.SECONDS), "bar");
|
||||
}
|
||||
finally {
|
||||
pf.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCurriedFunctionWithFunctionTerminal() {
|
||||
SpringApplication app = new SpringApplication(CurriedFunctionWithFunctionTerminal.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
try (ConfigurableApplicationContext context = app.run(
|
||||
"--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.curriedFunction-in-0.destination=testCurriedFunctionWithFunctionTerminal-in-0",
|
||||
"--spring.cloud.stream.bindings.curriedFunction-in-1.destination=testCurriedFunctionWithFunctionTerminal-in-1",
|
||||
"--spring.cloud.stream.bindings.curriedFunction-in-2.destination=testCurriedFunctionWithFunctionTerminal-in-2",
|
||||
"--spring.cloud.stream.bindings.curriedFunction-out-0.destination=testCurriedFunctionWithFunctionTerminal-out",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
try {
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
|
||||
template.setDefaultTopic("testCurriedFunctionWithFunctionTerminal-in-0");
|
||||
template.sendDefault("foobar");
|
||||
template.setDefaultTopic("testCurriedFunctionWithFunctionTerminal-in-1");
|
||||
template.sendDefault("foobar");
|
||||
template.setDefaultTopic("testCurriedFunctionWithFunctionTerminal-in-2");
|
||||
template.sendDefault("foobar");
|
||||
final ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer3, 10_000, 3);
|
||||
assertThat(records.count()).isEqualTo(3);
|
||||
records.forEach(stringStringConsumerRecord -> assertThat(stringStringConsumerRecord.value().contains("foobar")).isTrue());
|
||||
}
|
||||
finally {
|
||||
pf.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Component("foo")
|
||||
@EnableAutoConfiguration
|
||||
public static class FunctionAsComponent implements Function<KStream<Integer, String>,
|
||||
KStream<String, String>> {
|
||||
|
||||
@Override
|
||||
public KStream<String, String> apply(KStream<Integer, String> stringIntegerKStream) {
|
||||
return stringIntegerKStream.map((integer, s) -> new KeyValue<>(s, s + s));
|
||||
}
|
||||
}
|
||||
|
||||
@Component("bar")
|
||||
@EnableAutoConfiguration
|
||||
public static class ConsumerAsComponent implements java.util.function.Consumer<KStream<Integer, String>> {
|
||||
|
||||
@Override
|
||||
public void accept(KStream<Integer, String> integerStringKStream) {
|
||||
integerStringKStream.foreach((integer, s) -> LATCH_1.countDown());
|
||||
}
|
||||
}
|
||||
|
||||
@Component("bazz")
|
||||
@EnableAutoConfiguration
|
||||
public static class BiFunctionAsComponent implements BiFunction<KStream<String, String>, KStream<String, String>, KStream<String, String>> {
|
||||
|
||||
@Override
|
||||
public KStream<String, String> apply(KStream<String, String> stringStringKStream, KStream<String, String> stringStringKStream2) {
|
||||
return stringStringKStream.merge(stringStringKStream2);
|
||||
}
|
||||
}
|
||||
|
||||
@Component("buzz")
|
||||
@EnableAutoConfiguration
|
||||
public static class BiConsumerAsComponent implements BiConsumer<KStream<String, String>, KStream<String, String>> {
|
||||
|
||||
@Override
|
||||
public void accept(KStream<String, String> stringStringKStream, KStream<String, String> stringStringKStream2) {
|
||||
final KStream<String, String> merged = stringStringKStream.merge(stringStringKStream2);
|
||||
merged.foreach((s, s2) -> LATCH_2.countDown());
|
||||
}
|
||||
}
|
||||
|
||||
@Component("curriedConsumer")
|
||||
@EnableAutoConfiguration
|
||||
public static class CurriedFunctionWithConsumerTerminal implements Function<KStream<String, String>,
|
||||
Function<KStream<String, String>,
|
||||
java.util.function.Consumer<KStream<String, String>>>> {
|
||||
|
||||
@Override
|
||||
public Function<KStream<String, String>, java.util.function.Consumer<KStream<String, String>>> apply(KStream<String, String> stringStringKStream) {
|
||||
return stringStringKStream1 -> stringStringKStream2 -> {
|
||||
final KStream<String, String> merge1 = stringStringKStream.merge(stringStringKStream1);
|
||||
final KStream<String, String> merged2 = merge1.merge(stringStringKStream2);
|
||||
merged2.foreach((s1, s2) -> LATCH_3.countDown());
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Component("curriedFunction")
|
||||
@EnableAutoConfiguration
|
||||
public static class CurriedFunctionWithFunctionTerminal implements Function<KStream<String, String>,
|
||||
Function<KStream<String, String>,
|
||||
java.util.function.Function<KStream<String, String>, KStream<String, String>>>> {
|
||||
|
||||
@Override
|
||||
public Function<KStream<String, String>, Function<KStream<String, String>, KStream<String, String>>> apply(KStream<String, String> stringStringKStream) {
|
||||
return stringStringKStream1 -> stringStringKStream2 -> {
|
||||
final KStream<String, String> merge1 = stringStringKStream.merge(stringStringKStream1);
|
||||
return merge1.merge(stringStringKStream2);
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -60,10 +60,10 @@ public class KafkaStreamsFunctionStateStoreTests {
|
||||
|
||||
try (ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.function.definition=process;hello",
|
||||
"--spring.cloud.stream.bindings.process-in-0.destination=words",
|
||||
"--spring.cloud.stream.function.definition=biConsumerBean;hello",
|
||||
"--spring.cloud.stream.bindings.biConsumerBean-in-0.destination=words",
|
||||
"--spring.cloud.stream.bindings.hello-in-0.destination=words",
|
||||
"--spring.cloud.stream.kafka.streams.binder.functions.process.applicationId=testKafkaStreamsFuncionWithMultipleStateStores-123",
|
||||
"--spring.cloud.stream.kafka.streams.binder.functions.changed.applicationId=testKafkaStreamsFuncionWithMultipleStateStores-123",
|
||||
"--spring.cloud.stream.kafka.streams.binder.functions.hello.applicationId=testKafkaStreamsFuncionWithMultipleStateStores-456",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
|
||||
@@ -121,7 +121,7 @@ public class KafkaStreamsFunctionStateStoreTests {
|
||||
boolean processed1;
|
||||
boolean processed2;
|
||||
|
||||
@Bean
|
||||
@Bean(name = "biConsumerBean")
|
||||
public java.util.function.BiConsumer<KStream<Object, String>, KStream<Object, String>> process() {
|
||||
return (input0, input1) ->
|
||||
input0.process((ProcessorSupplier<Object, String>) () -> new Processor<Object, String>() {
|
||||
|
||||
Reference in New Issue
Block a user