Support KStream lifecycle through binding endpoint (#1042)

* Support KStream lifecycle through binding endpoint

Introduce the ability for Kafka Streams application's lifecycle
management through actuator binding endpoints. Kafka Streams
only supports STOP and START operations. PAUSE/RESUME operations
that is available in regular message channel based binders
are not available in Kafka Streams binder.

Adding tests and docs.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1038
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/850

https://stackoverflow.com/questions/60282225/why-doesnt-kstreambinder-have-a-lifecycle-for-defaultbinding

* Addressing PR review comments

* Addressing PR review

* cleanup unused code
This commit is contained in:
Soby Chacko
2021-03-15 16:14:19 -04:00
committed by GitHub
parent a7299df63f
commit 7cc001ac4c
13 changed files with 379 additions and 55 deletions

View File

@@ -1640,6 +1640,118 @@ For instance, if we want to change the header key on this binding to `my_event`
`spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event`.
=== Binding visualization and control in Kafka Streams binder
Starting with version 3.1.2, Kafka Streams binder supports binding visualization and control.
The only two lifecycle phases supported are `STOPPED` and `STARTED`.
The lifecycle phases `PAUSED` and `RESUMED` are not available in Kafka Streams binder.
In order to activate binding visualization and control, the application needs to include the following two dependencies.
```
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
```
If you prefer using webflux, you can then include `spring-boot-starter-webflux` instead of the standard web dependency.
In addition, you also need to set the following property:
```
management.endpoints.web.exposure.include=bindings
```
To illustrate this feature further, let us use the following application as a guide:
```
@SpringBootApplication
public class KafkaStreamsApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsApplication.class, args);
}
@Bean
public Consumer<KStream<String, String>> consumer() {
return s -> s.foreach((key, value) -> System.out.println(value));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> function() {
return ks -> ks;
}
}
```
As we can see, the application has two Kafka Streams functions - one, a consumer and another a function.
The consumer binding is named by default as `consumer-in-0`.
Similarly, for the function, the input binding is `function-in-0` and the output binding is `function-out-0`.
Once the application is started, we can find details about the bindings using the following bindings endpoint.
```
curl http://localhost:8080/actuator/bindings | jq .
[
{
"bindingName": "consumer-in-0",
"name": "consumer-in-0",
"group": "consumer-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-in-0",
"name": "function-in-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-out-0",
"name": "function-out-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": false,
"extendedInfo": {}
}
]
```
The details about all three bindings can be found above.
Let us now stop the consumer-in-0 binding.
```
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
```
At this point, no records will be received through this binding.
Start the binding again.
```
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
```
When there are multiple bindings present on a single function, invoking these operations on any of those bindings will work.
This is because all the bindings on a single function are backed by the same `StreamsBuilderFactoryBean`.
Therefore, for the function above, either `function-in-0` or `function-out-0` will work.
=== Configuration Options
This section contains the configuration options used by the Kafka Streams binder.

View File

@@ -160,7 +160,7 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
(KTableBoundElementFactory.KTableWrapper) targetBean;
//wrap the proxy created during the initial target type binding with real object (KTable)
kTableWrapper.wrap((KTable<Object, Object>) table);
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(input, streamsBuilderFactoryBean);
arguments[index] = table;
}
else if (parameterType.isAssignableFrom(GlobalKTable.class)) {
@@ -172,7 +172,7 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
(GlobalKTableBoundElementFactory.GlobalKTableWrapper) targetBean;
//wrap the proxy created during the initial target type binding with real object (KTable)
globalKTableWrapper.wrap((GlobalKTable<Object, Object>) table);
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(input, streamsBuilderFactoryBean);
arguments[index] = table;
}
}

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.List;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.springframework.cloud.stream.binder.AbstractBinder;
@@ -30,6 +32,8 @@ import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStr
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.StringUtils;
@@ -78,13 +82,30 @@ public class GlobalKTableBinder extends
group = properties.getExtension().getApplicationId();
}
final RetryTemplate retryTemplate = buildRetryTemplate(properties);
final String bindingName = this.kafkaStreamsBindingInformationCatalogue.bindingNamePerTarget(inputTarget);
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue
.getStreamsBuilderFactoryBeanPerBinding().get(bindingName);
KafkaStreamsBinderUtils.prepareConsumerBinding(name, group,
getApplicationContext(), this.kafkaTopicProvisioner,
this.binderConfigurationProperties, properties, retryTemplate, getBeanFactory(),
this.kafkaStreamsBindingInformationCatalogue.bindingNamePerTarget(inputTarget),
this.kafkaStreamsBindingInformationCatalogue);
this.kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
return new DefaultBinding<>(name, group, inputTarget, null);
return new DefaultBinding<GlobalKTable<Object, Object>>(bindingName, group, inputTarget, streamsBuilderFactoryBean) {
@Override
public boolean isInput() {
return true;
}
@Override
public synchronized void stop() {
super.stop();
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
}
};
}
@Override

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
@@ -38,6 +40,7 @@ import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStr
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.StringUtils;
@@ -76,10 +79,10 @@ class KStreamBinder extends
private final KeyValueSerdeResolver keyValueSerdeResolver;
KStreamBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
KeyValueSerdeResolver keyValueSerdeResolver) {
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
KeyValueSerdeResolver keyValueSerdeResolver) {
this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
@@ -103,13 +106,31 @@ class KStreamBinder extends
final RetryTemplate retryTemplate = buildRetryTemplate(properties);
final String bindingName = this.kafkaStreamsBindingInformationCatalogue.bindingNamePerTarget(inputTarget);
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue
.getStreamsBuilderFactoryBeanPerBinding().get(bindingName);
KafkaStreamsBinderUtils.prepareConsumerBinding(name, group,
getApplicationContext(), this.kafkaTopicProvisioner,
this.binderConfigurationProperties, properties, retryTemplate, getBeanFactory(),
this.kafkaStreamsBindingInformationCatalogue.bindingNamePerTarget(inputTarget),
this.kafkaStreamsBindingInformationCatalogue);
this.kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
return new DefaultBinding<>(name, group, inputTarget, null);
return new DefaultBinding<KStream<Object, Object>>(bindingName, group,
inputTarget, streamsBuilderFactoryBean) {
@Override
public boolean isInput() {
return true;
}
@Override
public synchronized void stop() {
super.stop();
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
}
};
}
@Override
@@ -138,7 +159,31 @@ class KStreamBinder extends
to(properties.isUseNativeEncoding(), name, outboundBindTarget,
(Serde<Object>) keySerde, (Serde<Object>) valueSerde, properties.getExtension());
return new DefaultBinding<>(name, null, outboundBindTarget, null);
final String bindingName = this.kafkaStreamsBindingInformationCatalogue.bindingNamePerTarget(outboundBindTarget);
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue
.getStreamsBuilderFactoryBeanPerBinding().get(bindingName);
// We need the application id to pass to DefaultBinding so that it won't be interpreted as an anonymous group.
// In case, if we can't find application.id (which is unlikely), we just default to bindingName.
// This will only be used for lifecycle management through actuator endpoints.
final Properties streamsConfiguration = streamsBuilderFactoryBean.getStreamsConfiguration();
final String applicationId = streamsConfiguration != null ? (String) streamsConfiguration.get("application.id") : bindingName;
return new DefaultBinding<KStream<Object, Object>>(bindingName,
applicationId, outboundBindTarget, streamsBuilderFactoryBean) {
@Override
public boolean isInput() {
return false;
}
@Override
public synchronized void stop() {
super.stop();
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
}
};
}
@SuppressWarnings("unchecked")

View File

@@ -30,6 +30,7 @@ import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStr
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.StringUtils;
@@ -81,13 +82,30 @@ class KTableBinder extends
}
final RetryTemplate retryTemplate = buildRetryTemplate(properties);
final String bindingName = this.kafkaStreamsBindingInformationCatalogue.bindingNamePerTarget(inputTarget);
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue
.getStreamsBuilderFactoryBeanPerBinding().get(bindingName);
KafkaStreamsBinderUtils.prepareConsumerBinding(name, group,
getApplicationContext(), this.kafkaTopicProvisioner,
this.binderConfigurationProperties, properties, retryTemplate, getBeanFactory(),
this.kafkaStreamsBindingInformationCatalogue.bindingNamePerTarget(inputTarget),
this.kafkaStreamsBindingInformationCatalogue);
this.kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
return new DefaultBinding<>(name, group, inputTarget, null);
return new DefaultBinding<KTable<Object, Object>>(bindingName, group, inputTarget, streamsBuilderFactoryBean) {
@Override
public boolean isInput() {
return true;
}
@Override
public synchronized void stop() {
super.stop();
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
}
};
}
@Override

View File

@@ -92,11 +92,13 @@ public class KafkaStreamsBinderMetrics {
this.meterBinder = registry -> {
if (streamsBuilderFactoryBeans != null) {
for (StreamsBuilderFactoryBean streamsBuilderFactoryBean : streamsBuilderFactoryBeans) {
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
final Map<MetricName, ? extends Metric> metrics = kafkaStreams.metrics();
if (streamsBuilderFactoryBean.isRunning()) {
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
final Map<MetricName, ? extends Metric> metrics = kafkaStreams.metrics();
prepareToBindMetrics(registry, metrics);
checkAndBindMetrics(registry, metrics);
prepareToBindMetrics(registry, metrics);
checkAndBindMetrics(registry, metrics);
}
}
}
};

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
@@ -28,6 +29,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
@@ -44,6 +46,7 @@ import org.springframework.cloud.stream.binder.kafka.utils.DlqDestinationResolve
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
import org.springframework.context.ApplicationContext;
import org.springframework.core.MethodParameter;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
@@ -51,6 +54,7 @@ import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
@@ -74,7 +78,8 @@ final class KafkaStreamsBinderUtils {
ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties,
RetryTemplate retryTemplate,
ConfigurableListableBeanFactory beanFactory, String bindingName,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) {
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties =
(ExtendedConsumerProperties) properties;
@@ -110,7 +115,7 @@ final class KafkaStreamsBinderUtils {
new ExtendedProducerProperties<>(
extendedConsumerProperties.getExtension().getDlqProducerProperties()),
binderConfigurationProperties);
kafkaStreamsBindingInformationCatalogue.addDlqProducerFactory(producerFactory);
kafkaStreamsBindingInformationCatalogue.addDlqProducerFactory(streamsBuilderFactoryBean, producerFactory);
KafkaOperations<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFactory);
@@ -201,4 +206,23 @@ final class KafkaStreamsBinderUtils {
return KStream.class.isAssignableFrom(targetBeanClass)
&& KStream.class.isAssignableFrom(methodParameter.getParameterType());
}
static void closeDlqProducerFactories(KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final List<ProducerFactory<byte[], byte[]>> dlqProducerFactories =
kafkaStreamsBindingInformationCatalogue.getDlqProducerFactory(streamsBuilderFactoryBean);
if (!CollectionUtils.isEmpty(dlqProducerFactories)) {
for (ProducerFactory<byte[], byte[]> producerFactory : dlqProducerFactories) {
try {
((DisposableBean) producerFactory).destroy();
}
catch (Exception exception) {
throw new IllegalStateException(exception);
}
}
}
}
}

View File

@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsConfig;
@@ -34,6 +35,7 @@ import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.core.ResolvableType;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.util.CollectionUtils;
/**
* A catalogue that provides binding information for Kafka Streams target types such as
@@ -50,7 +52,7 @@ public class KafkaStreamsBindingInformationCatalogue {
private final Map<KStream<?, ?>, KafkaStreamsConsumerProperties> consumerProperties = new ConcurrentHashMap<>();
private final Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = new HashSet<>();
private final Map<String, StreamsBuilderFactoryBean> streamsBuilderFactoryBeanPerBinding = new HashMap<>();
private final Map<Object, ResolvableType> outboundKStreamResolvables = new HashMap<>();
@@ -58,7 +60,7 @@ public class KafkaStreamsBindingInformationCatalogue {
private final Map<Object, String> bindingNamesPerTarget = new HashMap<>();
private final List<ProducerFactory<byte[], byte[]>> dlqProducerFactories = new ArrayList<>();
private final Map<StreamsBuilderFactoryBean, List<ProducerFactory<byte[], byte[]>>> dlqProducerFactories = new HashMap<>();
/**
* For a given bounded {@link KStream}, retrieve it's corresponding destination on the
@@ -127,19 +129,19 @@ public class KafkaStreamsBindingInformationCatalogue {
}
}
/**
* Adds a mapping for KStream -> {@link StreamsBuilderFactoryBean}.
* @param streamsBuilderFactoryBean provides the {@link StreamsBuilderFactoryBean}
* mapped to the KStream
*/
void addStreamBuilderFactory(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
this.streamsBuilderFactoryBeans.add(streamsBuilderFactoryBean);
Set<StreamsBuilderFactoryBean> getStreamsBuilderFactoryBeans() {
return new HashSet<>(this.streamsBuilderFactoryBeanPerBinding.values());
}
Set<StreamsBuilderFactoryBean> getStreamsBuilderFactoryBeans() {
return this.streamsBuilderFactoryBeans;
void addStreamBuilderFactoryPerBinding(String binding, StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
this.streamsBuilderFactoryBeanPerBinding.put(binding, streamsBuilderFactoryBean);
}
Map<String, StreamsBuilderFactoryBean> getStreamsBuilderFactoryBeanPerBinding() {
return this.streamsBuilderFactoryBeanPerBinding;
}
void addOutboundKStreamResolvable(Object key, ResolvableType outboundResolvable) {
this.outboundKStreamResolvables.put(key, outboundResolvable);
}
@@ -182,10 +184,23 @@ public class KafkaStreamsBindingInformationCatalogue {
}
public List<ProducerFactory<byte[], byte[]>> getDlqProducerFactories() {
return this.dlqProducerFactories;
return this.dlqProducerFactories.values()
.stream()
.flatMap(List::stream)
.collect(Collectors.toList());
}
public void addDlqProducerFactory(ProducerFactory<byte[], byte[]> producerFactory) {
this.dlqProducerFactories.add(producerFactory);
public List<ProducerFactory<byte[], byte[]>> getDlqProducerFactory(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
return this.dlqProducerFactories.get(streamsBuilderFactoryBean);
}
public void addDlqProducerFactory(StreamsBuilderFactoryBean streamsBuilderFactoryBean,
ProducerFactory<byte[], byte[]> producerFactory) {
List<ProducerFactory<byte[], byte[]>> producerFactories = this.dlqProducerFactories.get(streamsBuilderFactoryBean);
if (CollectionUtils.isEmpty(producerFactories)) {
producerFactories = new ArrayList<>();
this.dlqProducerFactories.put(streamsBuilderFactoryBean, producerFactories);
}
producerFactories.add(producerFactory);
}
}

View File

@@ -273,14 +273,14 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
final Set<String> outputs = new TreeSet<>(kafkaStreamsBindableProxyFactory.getOutputs());
final Iterator<String> outboundDefinitionIterator = outputs.iterator();
if (result.getClass().isArray()) {
handleKStreamArrayOutbound(resolvableType, functionName, kafkaStreamsBindableProxyFactory, outboundResolvableType, (Object[]) result);
final String initialInput = resolvableTypes.keySet().iterator().next();
final StreamsBuilderFactoryBean streamsBuilderFactoryBean =
this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeanPerBinding().get(initialInput);
handleKStreamArrayOutbound(resolvableType, functionName, kafkaStreamsBindableProxyFactory,
outboundResolvableType, (Object[]) result, streamsBuilderFactoryBean);
}
else {
if (outboundDefinitionIterator.hasNext()) {
Object targetBean = handleSingleKStreamOutbound((KStream) result, outboundDefinitionIterator);
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(targetBean,
outboundResolvableType);
}
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType, (KStream) result, outboundDefinitionIterator);
}
}
}
@@ -311,14 +311,15 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
final Iterator<String> outboundDefinitionIterator = outputs.iterator();
if (result.getClass().isArray()) {
handleKStreamArrayOutbound(resolvableType, functionName, kafkaStreamsBindableProxyFactory, outboundResolvableType, (Object[]) result);
final String initialInput = resolvableTypes.keySet().iterator().next();
final StreamsBuilderFactoryBean streamsBuilderFactoryBean =
this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeanPerBinding().get(initialInput);
handleKStreamArrayOutbound(resolvableType, functionName, kafkaStreamsBindableProxyFactory,
outboundResolvableType, (Object[]) result, streamsBuilderFactoryBean);
}
else {
if (outboundDefinitionIterator.hasNext()) {
Object targetBean = handleSingleKStreamOutbound((KStream) result, outboundDefinitionIterator);
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(
targetBean, outboundResolvableType != null ? outboundResolvableType : resolvableType.getGeneric(1));
}
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
outboundResolvableType : resolvableType.getGeneric(1), (KStream) result, outboundDefinitionIterator);
}
}
}
@@ -328,8 +329,22 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
}
}
private Object handleSingleKStreamOutbound(KStream result, Iterator<String> outboundDefinitionIterator) {
final String next = outboundDefinitionIterator.next();
private void handleSingleKStreamOutbound(Map<String, ResolvableType> resolvableTypes, ResolvableType outboundResolvableType,
KStream<Object, Object> result, Iterator<String> outboundDefinitionIterator) {
if (outboundDefinitionIterator.hasNext()) {
String outbound = outboundDefinitionIterator.next();
Object targetBean = handleSingleKStreamOutbound(result, outbound);
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(targetBean,
outboundResolvableType);
final String next = resolvableTypes.keySet().iterator().next();
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue
.getStreamsBuilderFactoryBeanPerBinding().get(next);
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(outbound, streamsBuilderFactoryBean);
}
}
private Object handleSingleKStreamOutbound(KStream<Object, Object> result, String next) {
Object targetBean = this.applicationContext.getBean(next);
KStreamBoundElementFactory.KStreamWrapper
boundElement = (KStreamBoundElementFactory.KStreamWrapper) targetBean;
@@ -340,7 +355,8 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
@SuppressWarnings({ "unchecked", "rawtypes" })
private void handleKStreamArrayOutbound(ResolvableType resolvableType, String functionName,
KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory,
ResolvableType outboundResolvableType, Object[] result) {
ResolvableType outboundResolvableType, Object[] result,
StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
// Binding target as the output bindings were deferred in the KafkaStreamsBindableProxyFactory
// due to the fact that it didn't know the returned array size. At this point in the execution,
// we know exactly the number of outbound components (from the array length), so do the binding.
@@ -365,6 +381,8 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(
targetBean, outboundResolvableType != null ? outboundResolvableType : resolvableType.getGeneric(1));
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(next, streamsBuilderFactoryBean);
}
}
@@ -428,7 +446,8 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
kStreamWrapper.wrap((KStream<Object, Object>) stream);
this.kafkaStreamsBindingInformationCatalogue.addKeySerde((KStream<?, ?>) kStreamWrapper, keySerde);
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(input, streamsBuilderFactoryBean);
if (KStream.class.isAssignableFrom(stringResolvableTypeMap.get(input).getRawClass())) {
final Class<?> valueClass =

View File

@@ -42,7 +42,14 @@ public class KafkaStreamsRegistry {
private final Set<KafkaStreams> kafkaStreams = new HashSet<>();
Set<KafkaStreams> getKafkaStreams() {
return this.kafkaStreams;
Set<KafkaStreams> currentlyRunningKafkaStreams = new HashSet<>();
for (KafkaStreams ks : this.kafkaStreams) {
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = streamsBuilderFactoryBeanMap.get(ks);
if (streamsBuilderFactoryBean.isRunning()) {
currentlyRunningKafkaStreams.add(ks);
}
}
return currentlyRunningKafkaStreams;
}
/**
@@ -67,7 +74,7 @@ public class KafkaStreamsRegistry {
public StreamsBuilderFactoryBean streamsBuilderFactoryBean(String applicationId) {
final Optional<StreamsBuilderFactoryBean> first = this.streamsBuilderFactoryBeanMap.values()
.stream()
.filter(streamsBuilderFactoryBean -> streamsBuilderFactoryBean
.filter(streamsBuilderFactoryBean -> streamsBuilderFactoryBean.isRunning() && streamsBuilderFactoryBean
.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals(applicationId))
.findFirst();

View File

@@ -185,17 +185,29 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr
}
if (methodAnnotatedOutboundNames != null && methodAnnotatedOutboundNames.length > 0) {
methodAnnotatedInboundName = populateInboundIfMissing(method, methodAnnotatedInboundName);
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue
.getStreamsBuilderFactoryBeanPerBinding().get(methodAnnotatedInboundName);
if (result.getClass().isArray()) {
Object[] outboundKStreams = (Object[]) result;
int i = 0;
for (Object outboundKStream : outboundKStreams) {
final String methodAnnotatedOutboundName = methodAnnotatedOutboundNames[i++];
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(
methodAnnotatedOutboundName, streamsBuilderFactoryBean);
Object targetBean = this.applicationContext
.getBean(methodAnnotatedOutboundNames[i++]);
.getBean(methodAnnotatedOutboundName);
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(targetBean, ResolvableType.forMethodReturnType(method));
adaptStreamListenerResult(outboundKStream, targetBean);
}
}
else {
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(
methodAnnotatedOutboundNames[0], streamsBuilderFactoryBean);
Object targetBean = this.applicationContext
.getBean(methodAnnotatedOutboundNames[0]);
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(targetBean, ResolvableType.forMethodReturnType(method));
@@ -210,6 +222,21 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr
}
}
private String populateInboundIfMissing(Method method, String methodAnnotatedInboundName) {
if (!StringUtils.hasText(methodAnnotatedInboundName)) {
Object[] arguments = new Object[method.getParameterTypes().length];
if (arguments.length > 0) {
MethodParameter methodParameter = MethodParameter.forExecutable(method, 0);
if (methodParameter.hasParameterAnnotation(Input.class)) {
Input methodAnnotation = methodParameter
.getParameterAnnotation(Input.class);
methodAnnotatedInboundName = methodAnnotation.value();
}
}
}
return methodAnnotatedInboundName;
}
@SuppressWarnings("unchecked")
private void adaptStreamListenerResult(Object outboundKStream, Object targetBean) {
for (StreamListenerResultAdapter streamListenerResultAdapter : this.streamListenerResultAdapters) {
@@ -292,8 +319,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr
BindingProperties bindingProperties1 = this.kafkaStreamsBindingInformationCatalogue.getBindingProperties().get(kStreamWrapper);
this.kafkaStreamsBindingInformationCatalogue.registerBindingProperties(stream, bindingProperties1);
this.kafkaStreamsBindingInformationCatalogue
.addStreamBuilderFactory(streamsBuilderFactoryBean);
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(inboundName, streamsBuilderFactoryBean);
for (StreamListenerParameterAdapter streamListenerParameterAdapter : adapters) {
if (streamListenerParameterAdapter.supports(stream.getClass(),
methodParameter)) {

View File

@@ -102,6 +102,7 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
Mockito.when(mock.getKafkaStreams()).thenReturn(mockKafkaStreams);
KafkaStreamsRegistry kafkaStreamsRegistry = new KafkaStreamsRegistry();
kafkaStreamsRegistry.registerKafkaStreams(mock);
Mockito.when(mock.isRunning()).thenReturn(true);
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties =
new KafkaStreamsBinderConfigurationProperties(new KafkaProperties());
binderConfigurationProperties.getStateStoreRetry().setMaxAttempts(3);

View File

@@ -17,9 +17,11 @@
package org.springframework.cloud.stream.binder.kafka.streams.function;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -40,15 +42,22 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
import org.springframework.cloud.stream.binder.kafka.streams.endpoint.KafkaStreamsTopologyEndpoint;
import org.springframework.cloud.stream.binding.InputBindingLifecycle;
import org.springframework.cloud.stream.binding.OutputBindingLifecycle;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.Lifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -90,7 +99,7 @@ public class KafkaStreamsBinderWordCountFunctionTests {
@Test
@SuppressWarnings("unchecked")
public void testKstreamWordCountFunction() throws Exception {
public void testBasicKStreamTopologyExecution() throws Exception {
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
@@ -132,6 +141,31 @@ public class KafkaStreamsBinderWordCountFunctionTests {
Map<String, Object> streamConfigGlobalProperties = (Map<String, Object>) context.getBean("streamConfigGlobalProperties");
assertThat(streamConfigGlobalProperties.get("request.timeout.ms")).isEqualTo("29000");
assertThat(streamConfigGlobalProperties.get("max.block.ms")).isEqualTo("90000");
InputBindingLifecycle inputBindingLifecycle = context.getBean(InputBindingLifecycle.class);
final Collection<Binding<Object>> inputBindings = (Collection<Binding<Object>>) new DirectFieldAccessor(inputBindingLifecycle)
.getPropertyValue("inputBindings");
assertThat(inputBindings).isNotNull();
final Optional<Binding<Object>> theOnlyInputBinding = inputBindings.stream().findFirst();
assertThat(theOnlyInputBinding.isPresent()).isTrue();
final DefaultBinding<Object> objectBinding = (DefaultBinding<Object>) theOnlyInputBinding.get();
assertThat(objectBinding.getBindingName()).isEqualTo("process-in-0");
final Lifecycle lifecycle = (Lifecycle) new DirectFieldAccessor(objectBinding).getPropertyValue("lifecycle");
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean(StreamsBuilderFactoryBean.class);
assertThat(lifecycle).isEqualTo(streamsBuilderFactoryBean);
OutputBindingLifecycle outputBindingLifecycle = context.getBean(OutputBindingLifecycle.class);
final Collection<Binding<Object>> outputBindings = (Collection<Binding<Object>>) new DirectFieldAccessor(outputBindingLifecycle)
.getPropertyValue("outputBindings");
assertThat(outputBindings).isNotNull();
final Optional<Binding<Object>> theOnlyOutputBinding = outputBindings.stream().findFirst();
assertThat(theOnlyOutputBinding.isPresent()).isTrue();
final DefaultBinding<Object> objectBinding1 = (DefaultBinding<Object>) theOnlyOutputBinding.get();
assertThat(objectBinding1.getBindingName()).isEqualTo("process-out-0");
final Lifecycle lifecycle1 = (Lifecycle) new DirectFieldAccessor(objectBinding1).getPropertyValue("lifecycle");
assertThat(lifecycle1).isEqualTo(streamsBuilderFactoryBean);
}
}