Kafka Streams binder name changes

- Rename spring-cloud-stream-binder-kstream to spring-cloud-stream-binder-kafka-streams
 - Corresponding changes in maven pom.xml files
 - Rename relevant classes to prefix with KafkaStreams instead of KStream
 - Corresponding package changes from org.springframework.cloud.stream.kstream to
   org.springframework.cloud.stream.kafka.streams
 - Organize all the configuration property classes in a properties package
 - Remove kstream from all the properties exposed by Apache Kafka Streams binder
 - Classes that need not be public are now moved to package access level
 - Test changes
 - More javadocs to classes

Resolves #246
This commit is contained in:
Soby Chacko
2018-02-12 12:00:42 -05:00
parent 72e2aeec2a
commit a5344655cb
47 changed files with 729 additions and 622 deletions

View File

@@ -22,7 +22,7 @@
<module>spring-cloud-starter-stream-kafka</module>
<module>spring-cloud-stream-binder-kafka-docs</module>
<module>spring-cloud-stream-binder-kafka-core</module>
<module>spring-cloud-stream-binder-kstream</module>
<module>spring-cloud-stream-binder-kafka-streams</module>
</modules>
<dependencyManagement>

View File

@@ -2,9 +2,9 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-stream-binder-kstream</name>
<name>spring-cloud-stream-binder-kafka-streams</name>
<description>Kafka Streams Binder Implementation</description>
<parent>

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,43 +33,48 @@ import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kstream.config.KStreamBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kstream.config.KStreamConsumerProperties;
import org.springframework.cloud.stream.binder.kstream.config.KStreamExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kstream.config.KStreamProducerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.util.StringUtils;
/**
* {@link org.springframework.cloud.stream.binder.Binder} implementation for {@link KStream}.
* This implemenation extends from the {@link AbstractBinder} directly.
*
* Provides both producer and consumer bindings for the bound KStream.
*
* @author Marius Bogoevici
* @author Soby Chacko
*/
public class KStreamBinder extends
AbstractBinder<KStream<Object, Object>, ExtendedConsumerProperties<KStreamConsumerProperties>, ExtendedProducerProperties<KStreamProducerProperties>>
implements ExtendedPropertiesBinder<KStream<Object, Object>, KStreamConsumerProperties, KStreamProducerProperties> {
class KStreamBinder extends
AbstractBinder<KStream<Object, Object>, ExtendedConsumerProperties<KafkaStreamsConsumerProperties>, ExtendedProducerProperties<KafkaStreamsProducerProperties>>
implements ExtendedPropertiesBinder<KStream<Object, Object>, KafkaStreamsConsumerProperties, KafkaStreamsProducerProperties> {
private final static Log LOG = LogFactory.getLog(KStreamBinder.class);
private final KafkaTopicProvisioner kafkaTopicProvisioner;
private KStreamExtendedBindingProperties kStreamExtendedBindingProperties = new KStreamExtendedBindingProperties();
private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties = new KafkaStreamsExtendedBindingProperties();
private final KStreamBinderConfigurationProperties binderConfigurationProperties;
private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
private final KStreamBoundMessageConversionDelegate kStreamBoundMessageConversionDelegate;
private final KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate;
private final KStreamBindingInformationCatalogue KStreamBindingInformationCatalogue;
private final KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue;
private final KeyValueSerdeResolver keyValueSerdeResolver;
public KStreamBinder(KStreamBinderConfigurationProperties binderConfigurationProperties,
KStreamBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KStreamBoundMessageConversionDelegate kStreamBoundMessageConversionDelegate,
KStreamBindingInformationCatalogue KStreamBindingInformationCatalogue,
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
KeyValueSerdeResolver keyValueSerdeResolver) {
this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kStreamBoundMessageConversionDelegate = kStreamBoundMessageConversionDelegate;
this.KStreamBindingInformationCatalogue = KStreamBindingInformationCatalogue;
this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
this.KafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
this.keyValueSerdeResolver = keyValueSerdeResolver;
}
@@ -77,29 +82,29 @@ public class KStreamBinder extends
@SuppressWarnings("unchecked")
protected Binding<KStream<Object, Object>> doBindConsumer(String name, String group,
KStream<Object, Object> inputTarget,
ExtendedConsumerProperties<KStreamConsumerProperties> properties) {
this.KStreamBindingInformationCatalogue.registerConsumerProperties(inputTarget, properties.getExtension());
ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties) {
this.KafkaStreamsBindingInformationCatalogue.registerConsumerProperties(inputTarget, properties.getExtension());
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(
properties.getExtension());
if (binderConfigurationProperties.getSerdeError() == KStreamBinderConfigurationProperties.SerdeError.sendToDlq) {
if (binderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.sendToDlq) {
extendedConsumerProperties.getExtension().setEnableDlq(true);
}
if (!StringUtils.hasText(group)) {
group = binderConfigurationProperties.getApplicationId();
}
this.kafkaTopicProvisioner.provisionConsumerDestination(name, group, extendedConsumerProperties);
StreamsConfig streamsConfig = this.KStreamBindingInformationCatalogue.getStreamsConfig(inputTarget);
StreamsConfig streamsConfig = this.KafkaStreamsBindingInformationCatalogue.getStreamsConfig(inputTarget);
if (extendedConsumerProperties.getExtension().isEnableDlq()) {
String dlqName = StringUtils.isEmpty(extendedConsumerProperties.getExtension().getDlqName()) ?
"error." + name + "." + group : extendedConsumerProperties.getExtension().getDlqName();
KStreamDlqDispatch kStreamDlqDispatch = new KStreamDlqDispatch(dlqName, binderConfigurationProperties,
KafkaStreamsDlqDispatch kafkaStreamsDlqDispatch = new KafkaStreamsDlqDispatch(dlqName, binderConfigurationProperties,
extendedConsumerProperties.getExtension());
SendToDlqAndContinue sendToDlqAndContinue = this.getApplicationContext().getBean(SendToDlqAndContinue.class);
sendToDlqAndContinue.addKStreamDlqDispatch(name, kStreamDlqDispatch);
sendToDlqAndContinue.addKStreamDlqDispatch(name, kafkaStreamsDlqDispatch);
DeserializationExceptionHandler deserializationExceptionHandler = streamsConfig.defaultDeserializationExceptionHandler();
if(deserializationExceptionHandler instanceof SendToDlqAndContinue) {
((SendToDlqAndContinue)deserializationExceptionHandler).addKStreamDlqDispatch(name, kStreamDlqDispatch);
((SendToDlqAndContinue)deserializationExceptionHandler).addKStreamDlqDispatch(name, kafkaStreamsDlqDispatch);
}
}
return new DefaultBinding<>(name, group, inputTarget, null);
@@ -108,7 +113,7 @@ public class KStreamBinder extends
@Override
@SuppressWarnings("unchecked")
protected Binding<KStream<Object, Object>> doBindProducer(String name, KStream<Object, Object> outboundBindTarget,
ExtendedProducerProperties<KStreamProducerProperties> properties) {
ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {
ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(
new KafkaProducerProperties());
this.kafkaTopicProvisioner.provisionProducerDestination(name, extendedProducerProperties);
@@ -123,7 +128,7 @@ public class KStreamBinder extends
Serde<Object> keySerde, Serde<Object> valueSerde) {
if (!isNativeEncoding) {
LOG.info("Native encoding is disabled for " + name + ". Outbound message conversion done by Spring Cloud Stream.");
kStreamBoundMessageConversionDelegate.serializeOnOutbound(outboundBindTarget)
kafkaStreamsMessageConversionDelegate.serializeOnOutbound(outboundBindTarget)
.to(name, Produced.with(keySerde, valueSerde));
}
else {
@@ -133,16 +138,16 @@ public class KStreamBinder extends
}
@Override
public KStreamConsumerProperties getExtendedConsumerProperties(String channelName) {
return this.kStreamExtendedBindingProperties.getExtendedConsumerProperties(channelName);
public KafkaStreamsConsumerProperties getExtendedConsumerProperties(String channelName) {
return this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(channelName);
}
@Override
public KStreamProducerProperties getExtendedProducerProperties(String channelName) {
return this.kStreamExtendedBindingProperties.getExtendedProducerProperties(channelName);
public KafkaStreamsProducerProperties getExtendedProducerProperties(String channelName) {
return this.kafkaStreamsExtendedBindingProperties.getExtendedProducerProperties(channelName);
}
public void setkStreamExtendedBindingProperties(KStreamExtendedBindingProperties kStreamExtendedBindingProperties) {
this.kStreamExtendedBindingProperties = kStreamExtendedBindingProperties;
public void setKafkaStreamsExtendedBindingProperties(KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream.config;
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -23,10 +23,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kstream.KStreamBinder;
import org.springframework.cloud.stream.binder.kstream.KStreamBindingInformationCatalogue;
import org.springframework.cloud.stream.binder.kstream.KStreamBoundMessageConversionDelegate;
import org.springframework.cloud.stream.binder.kstream.KeyValueSerdeResolver;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -44,7 +42,7 @@ public class KStreamBinderConfiguration {
private KafkaProperties kafkaProperties;
@Autowired
private KStreamExtendedBindingProperties kStreamExtendedBindingProperties;
private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
@Bean
public KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties binderConfigurationProperties) {
@@ -52,15 +50,15 @@ public class KStreamBinderConfiguration {
}
@Bean
public KStreamBinder kStreamBinder(KStreamBinderConfigurationProperties binderConfigurationProperties,
public KStreamBinder kStreamBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KStreamBoundMessageConversionDelegate KStreamBoundMessageConversionDelegate,
KStreamBindingInformationCatalogue KStreamBindingInformationCatalogue,
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
KeyValueSerdeResolver keyValueSerdeResolver) {
KStreamBinder kStreamBinder = new KStreamBinder(binderConfigurationProperties, kafkaTopicProvisioner,
KStreamBoundMessageConversionDelegate, KStreamBindingInformationCatalogue,
KafkaStreamsMessageConversionDelegate, KafkaStreamsBindingInformationCatalogue,
keyValueSerdeResolver);
kStreamBinder.setkStreamExtendedBindingProperties(kStreamExtendedBindingProperties);
kStreamBinder.setKafkaStreamsExtendedBindingProperties(kafkaStreamsExtendedBindingProperties);
return kStreamBinder;
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
@@ -27,20 +27,25 @@ import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.util.Assert;
/**
* {@link org.springframework.cloud.stream.binding.BindingTargetFactory} for{@link KStream}.
*
* The implementation creates proxies for both input and output binding.
* The actual target will be created downstream through further binding process.
*
* @author Marius Bogoevici
* @author Soby Chacko
*/
public class KStreamBoundElementFactory extends AbstractBindingTargetFactory<KStream> {
class KStreamBoundElementFactory extends AbstractBindingTargetFactory<KStream> {
private final BindingServiceProperties bindingServiceProperties;
private final KStreamBindingInformationCatalogue KStreamBindingInformationCatalogue;
private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
public KStreamBoundElementFactory(BindingServiceProperties bindingServiceProperties,
KStreamBindingInformationCatalogue KStreamBindingInformationCatalogue) {
KStreamBoundElementFactory(BindingServiceProperties bindingServiceProperties,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue) {
super(KStream.class);
this.bindingServiceProperties = bindingServiceProperties;
this.KStreamBindingInformationCatalogue = KStreamBindingInformationCatalogue;
this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
}
@Override
@@ -61,8 +66,9 @@ public class KStreamBoundElementFactory extends AbstractBindingTargetFactory<KSt
KStream proxy = (KStream) proxyFactory.getProxy();
//Add the binding properties to the catalogue for later retrieval during further binding steps downstream.
BindingProperties bindingProperties = bindingServiceProperties.getBindingProperties(name);
this.KStreamBindingInformationCatalogue.registerBindingProperties(proxy, bindingProperties);
this.kafkaStreamsBindingInformationCatalogue.registerBindingProperties(proxy, bindingProperties);
return proxy;
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
@@ -28,15 +28,15 @@ import org.springframework.core.ResolvableType;
* @author Marius Bogoevici
* @author Soby Chacko
*/
public class KStreamListenerParameterAdapter implements StreamListenerParameterAdapter<KStream<?,?>, KStream<?, ?>> {
class KStreamStreamListenerParameterAdapter implements StreamListenerParameterAdapter<KStream<?,?>, KStream<?, ?>> {
private final KStreamBoundMessageConversionDelegate kStreamBoundMessageConversionDelegate;
private final KStreamBindingInformationCatalogue KStreamBindingInformationCatalogue;
private final KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate;
private final KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue;
public KStreamListenerParameterAdapter(KStreamBoundMessageConversionDelegate kStreamBoundMessageConversionDelegate,
KStreamBindingInformationCatalogue KStreamBindingInformationCatalogue) {
this.kStreamBoundMessageConversionDelegate = kStreamBoundMessageConversionDelegate;
this.KStreamBindingInformationCatalogue = KStreamBindingInformationCatalogue;
KStreamStreamListenerParameterAdapter(KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue) {
this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
this.KafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
}
@Override
@@ -51,11 +51,11 @@ public class KStreamListenerParameterAdapter implements StreamListenerParameterA
ResolvableType resolvableType = ResolvableType.forMethodParameter(parameter);
final Class<?> valueClass = (resolvableType.getGeneric(1).getRawClass() != null)
? (resolvableType.getGeneric(1).getRawClass()) : Object.class;
if (this.KStreamBindingInformationCatalogue.isUseNativeDecoding(bindingTarget)) {
if (this.KafkaStreamsBindingInformationCatalogue.isUseNativeDecoding(bindingTarget)) {
return bindingTarget.map((KeyValueMapper) KeyValue::new);
}
else {
return kStreamBoundMessageConversionDelegate.deserializeOnInbound(valueClass, bindingTarget);
return kafkaStreamsMessageConversionDelegate.deserializeOnInbound(valueClass, bindingTarget);
}
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.io.Closeable;
import java.io.IOException;
@@ -26,8 +26,9 @@ import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
/**
* @author Marius Bogoevici
* @author Soby Chacko
*/
public class KStreamStreamListenerResultAdapter implements StreamListenerResultAdapter<KStream, KStreamBoundElementFactory.KStreamWrapper> {
class KStreamStreamListenerResultAdapter implements StreamListenerResultAdapter<KStream, KStreamBoundElementFactory.KStreamWrapper> {
@Override
public boolean supports(Class<?> resultType, Class<?> boundElement) {

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
@@ -28,44 +28,46 @@ import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kstream.config.KStreamBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kstream.config.KStreamConsumerProperties;
import org.springframework.cloud.stream.binder.kstream.config.KStreamExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kstream.config.KStreamProducerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.util.StringUtils;
/**
* {@link org.springframework.cloud.stream.binder.Binder} implementation for {@link KTable}.
* This implemenation extends from the {@link AbstractBinder} directly.
*
* @since 2.0.0
* Provides only consumer binding for the bound KTable as output bindings are not allowed on it.
*
* @author Soby Chacko
*/
public class KTableBinder extends
AbstractBinder<KTable<Object, Object>, ExtendedConsumerProperties<KStreamConsumerProperties>, ExtendedProducerProperties<KStreamProducerProperties>>
implements ExtendedPropertiesBinder<KTable<Object, Object>, KStreamConsumerProperties, KStreamProducerProperties> {
class KTableBinder extends
AbstractBinder<KTable<Object, Object>, ExtendedConsumerProperties<KafkaStreamsConsumerProperties>, ExtendedProducerProperties<KafkaStreamsProducerProperties>>
implements ExtendedPropertiesBinder<KTable<Object, Object>, KafkaStreamsConsumerProperties, KafkaStreamsProducerProperties> {
private final KStreamBinderConfigurationProperties binderConfigurationProperties;
private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
private final KafkaTopicProvisioner kafkaTopicProvisioner;
private final KStreamBindingInformationCatalogue KStreamBindingInformationCatalogue;
private final KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue;
private KStreamExtendedBindingProperties kStreamExtendedBindingProperties = new KStreamExtendedBindingProperties();
private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties = new KafkaStreamsExtendedBindingProperties();
public KTableBinder(KStreamBinderConfigurationProperties binderConfigurationProperties, KafkaTopicProvisioner kafkaTopicProvisioner,
KStreamBindingInformationCatalogue kStreamBindingInformationCatalogue) {
KTableBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) {
this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
KStreamBindingInformationCatalogue = kStreamBindingInformationCatalogue;
this.KafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
}
@Override
@SuppressWarnings("unchecked")
protected Binding<KTable<Object, Object>> doBindConsumer(String name, String group, KTable<Object, Object> inputTarget,
ExtendedConsumerProperties<KStreamConsumerProperties> properties) {
ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties) {
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(
properties.getExtension());
if (binderConfigurationProperties.getSerdeError() == KStreamBinderConfigurationProperties.SerdeError.sendToDlq) {
if (binderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.sendToDlq) {
extendedConsumerProperties.getExtension().setEnableDlq(true);
}
if (!StringUtils.hasText(group)) {
@@ -76,15 +78,15 @@ public class KTableBinder extends
if (extendedConsumerProperties.getExtension().isEnableDlq()) {
String dlqName = StringUtils.isEmpty(extendedConsumerProperties.getExtension().getDlqName()) ?
"error." + name + "." + group : extendedConsumerProperties.getExtension().getDlqName();
KStreamDlqDispatch kStreamDlqDispatch = new KStreamDlqDispatch(dlqName, binderConfigurationProperties,
KafkaStreamsDlqDispatch kafkaStreamsDlqDispatch = new KafkaStreamsDlqDispatch(dlqName, binderConfigurationProperties,
extendedConsumerProperties.getExtension());
SendToDlqAndContinue sendToDlqAndContinue = this.getApplicationContext().getBean(SendToDlqAndContinue.class);
sendToDlqAndContinue.addKStreamDlqDispatch(name, kStreamDlqDispatch);
sendToDlqAndContinue.addKStreamDlqDispatch(name, kafkaStreamsDlqDispatch);
StreamsConfig streamsConfig = this.KStreamBindingInformationCatalogue.getStreamsConfig(inputTarget);
StreamsConfig streamsConfig = this.KafkaStreamsBindingInformationCatalogue.getStreamsConfig(inputTarget);
DeserializationExceptionHandler deserializationExceptionHandler = streamsConfig.defaultDeserializationExceptionHandler();
if(deserializationExceptionHandler instanceof SendToDlqAndContinue) {
((SendToDlqAndContinue)deserializationExceptionHandler).addKStreamDlqDispatch(name, kStreamDlqDispatch);
((SendToDlqAndContinue)deserializationExceptionHandler).addKStreamDlqDispatch(name, kafkaStreamsDlqDispatch);
}
}
return new DefaultBinding<>(name, group, inputTarget, null);
@@ -92,17 +94,17 @@ public class KTableBinder extends
@Override
protected Binding<KTable<Object, Object>> doBindProducer(String name, KTable<Object, Object> outboundBindTarget,
ExtendedProducerProperties<KStreamProducerProperties> properties) {
ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {
throw new UnsupportedOperationException("No producer level binding is allowed for KTable");
}
@Override
public KStreamConsumerProperties getExtendedConsumerProperties(String channelName) {
return this.kStreamExtendedBindingProperties.getExtendedConsumerProperties(channelName);
public KafkaStreamsConsumerProperties getExtendedConsumerProperties(String channelName) {
return this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(channelName);
}
@Override
public KStreamProducerProperties getExtendedProducerProperties(String channelName) {
return this.kStreamExtendedBindingProperties.getExtendedProducerProperties(channelName);
public KafkaStreamsProducerProperties getExtendedProducerProperties(String channelName) {
return this.kafkaStreamsExtendedBindingProperties.getExtendedProducerProperties(channelName);
}
}

View File

@@ -14,14 +14,14 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream.config;
package org.springframework.cloud.stream.binder.kafka.streams;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kstream.KStreamBindingInformationCatalogue;
import org.springframework.cloud.stream.binder.kstream.KTableBinder;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.context.annotation.Bean;
/**
@@ -33,7 +33,7 @@ public class KTableBinderConfiguration {
private KafkaProperties kafkaProperties;
@Autowired
private KStreamExtendedBindingProperties kStreamExtendedBindingProperties;
private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
@Bean
public KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties binderConfigurationProperties) {
@@ -41,11 +41,11 @@ public class KTableBinderConfiguration {
}
@Bean
public KTableBinder kTableBinder(KStreamBinderConfigurationProperties binderConfigurationProperties,
public KTableBinder kTableBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KStreamBindingInformationCatalogue KStreamBindingInformationCatalogue) {
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue) {
KTableBinder kStreamBinder = new KTableBinder(binderConfigurationProperties, kafkaTopicProvisioner,
KStreamBindingInformationCatalogue);
KafkaStreamsBindingInformationCatalogue);
return kStreamBinder;
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
@@ -27,12 +27,13 @@ import org.springframework.util.Assert;
/**
* {@link org.springframework.cloud.stream.binding.BindingTargetFactory} for {@link KTable}
*
* @since 2.0.0
* Input bindings are only created as output bindings on KTable are not allowed.
*
* @author Soby Chacko
*/
public class KTableBoundElementFactory extends AbstractBindingTargetFactory<KTable> {
class KTableBoundElementFactory extends AbstractBindingTargetFactory<KTable> {
public KTableBoundElementFactory() {
KTableBoundElementFactory() {
super(KTable.class);
}

View File

@@ -14,12 +14,13 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream.config;
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -27,12 +28,12 @@ import org.springframework.context.annotation.Configuration;
* @author Soby Chacko
*/
@Configuration
@EnableConfigurationProperties(KStreamApplicationSupportProperties.class)
public class KStreamApplicationSupportAutoConfiguration {
@EnableConfigurationProperties(KafkaStreamsApplicationSupportProperties.class)
public class KafkaStreamsApplicationSupportAutoConfiguration {
@Bean
@ConditionalOnProperty("spring.cloud.stream.kstream.timeWindow.length")
public TimeWindows configuredTimeWindow(KStreamApplicationSupportProperties processorProperties) {
@ConditionalOnProperty("spring.cloud.stream.kafka.streams.timeWindow.length")
public TimeWindows configuredTimeWindow(KafkaStreamsApplicationSupportProperties processorProperties) {
return processorProperties.getTimeWindow().getAdvanceBy() > 0
? TimeWindows.of(processorProperties.getTimeWindow().getLength()).advanceBy(processorProperties.getTimeWindow().getAdvanceBy())
: TimeWindows.of(processorProperties.getTimeWindow().getLength());

View File

@@ -0,0 +1,152 @@
/*
* Copyright 2017-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.util.ObjectUtils;
/**
* @author Marius Bogoevici
* @author Soby Chacko
*/
@EnableConfigurationProperties(KafkaStreamsExtendedBindingProperties.class)
public class KafkaStreamsBinderSupportAutoConfiguration {
@Bean
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.streams.binder")
public KafkaStreamsBinderConfigurationProperties binderConfigurationProperties() {
return new KafkaStreamsBinderConfigurationProperties();
}
@Bean("streamConfigGlobalProperties")
public Map<String, Object> streamConfigGlobalProperties(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, binderConfigurationProperties.getKafkaConnectionString());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, binderConfigurationProperties.getApplicationId());
if (binderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndContinue) {
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
} else if (binderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndFail) {
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
} else if (binderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.sendToDlq) {
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
SendToDlqAndContinue.class);
}
if (!ObjectUtils.isEmpty(binderConfigurationProperties.getConfiguration())) {
props.putAll(binderConfigurationProperties.getConfiguration());
}
return props;
}
@Bean
public KStreamStreamListenerResultAdapter kstreamStreamListenerResultAdapter() {
return new KStreamStreamListenerResultAdapter();
}
@Bean
public KStreamStreamListenerParameterAdapter kstreamStreamListenerParameterAdapter(
KafkaStreamsMessageConversionDelegate kstreamBoundMessageConversionDelegate, KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue) {
return new KStreamStreamListenerParameterAdapter(kstreamBoundMessageConversionDelegate, KafkaStreamsBindingInformationCatalogue);
}
@Bean
public KafkaStreamsStreamListenerSetupMethodOrchestrator kafkaStreamsStreamListenerSetupMethodOrchestrator(
BindingServiceProperties bindingServiceProperties,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
KStreamStreamListenerParameterAdapter kafkaStreamListenerParameterAdapter,
Collection<StreamListenerResultAdapter> streamListenerResultAdapters,
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
return new KafkaStreamsStreamListenerSetupMethodOrchestrator(bindingServiceProperties,
kafkaStreamsExtendedBindingProperties, keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue,
kafkaStreamListenerParameterAdapter, streamListenerResultAdapters, binderConfigurationProperties);
}
@Bean
public KafkaStreamsMessageConversionDelegate messageConversionDelegate(CompositeMessageConverterFactory compositeMessageConverterFactory,
SendToDlqAndContinue sendToDlqAndContinue,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
return new KafkaStreamsMessageConversionDelegate(compositeMessageConverterFactory, sendToDlqAndContinue,
KafkaStreamsBindingInformationCatalogue, binderConfigurationProperties);
}
@Bean
public KStreamBoundElementFactory kStreamBoundElementFactory(BindingServiceProperties bindingServiceProperties,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue) {
return new KStreamBoundElementFactory(bindingServiceProperties,
KafkaStreamsBindingInformationCatalogue);
}
@Bean
public KTableBoundElementFactory kTableBoundElementFactory() {
return new KTableBoundElementFactory();
}
@Bean
public SendToDlqAndContinue sendToDlqAndContinue() {
return new SendToDlqAndContinue();
}
@Bean
public KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue() {
return new KafkaStreamsBindingInformationCatalogue();
}
@Bean
@SuppressWarnings("unchecked")
public KeyValueSerdeResolver keyValueSerdeResolver(@Qualifier("streamConfigGlobalProperties") Object streamConfigGlobalProperties,
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties) {
return new KeyValueSerdeResolver((Map<String, Object>) streamConfigGlobalProperties, kafkaStreamsBinderConfigurationProperties);
}
@Bean
public QueryableStoreRegistry queryableStoreTypeRegistry() {
return new QueryableStoreRegistry();
}
@Bean
public StreamsBuilderFactoryManager streamsBuilderFactoryManager(KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
QueryableStoreRegistry queryableStoreRegistry) {
return new StreamsBuilderFactoryManager(kafkaStreamsBindingInformationCatalogue, queryableStoreRegistry);
}
}

View File

@@ -14,9 +14,8 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -26,27 +25,26 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.kstream.config.KStreamConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
/**
* A catalogue containing all the inbound and outboud KStreams.
* It registers {@link BindingProperties} and {@link KStreamConsumerProperties}
* for the bounded KStreams. This registry provides services for finding
* specific binding level information for the bounded KStream. This includes
* information such as the configured content type, destination etc.
*
* @since 2.0.0
* A catalogue that provides binding information for Kafka Streams target types such as KStream.
* It also keeps a catalogue for the underlying {@link StreamsBuilderFactoryBean} and
* {@link StreamsConfig} associated with various {@link org.springframework.cloud.stream.annotation.StreamListener}
* methods in the {@link org.springframework.context.ApplicationContext}.
*
* @author Soby Chacko
*/
public class KStreamBindingInformationCatalogue {
class KafkaStreamsBindingInformationCatalogue {
private final Map<KStream<?, ?>, BindingProperties> bindingProperties = new ConcurrentHashMap<>();
private final Map<KStream<?, ?>, KStreamConsumerProperties> consumerProperties = new ConcurrentHashMap<>();
private final Map<Object, StreamsConfig> streamsConfigs = new HashMap<>();
private final Map<KStream<?, ?>, KafkaStreamsConsumerProperties> consumerProperties = new ConcurrentHashMap<>();
private final Map<Object, StreamsConfig> streamsConfigs = new ConcurrentHashMap<>();
private final Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = new HashSet<>();
/**
@@ -56,7 +54,7 @@ public class KStreamBindingInformationCatalogue {
* @param bindingTarget KStream binding target
* @return destination topic on Kafka
*/
public String getDestination(KStream<?,?> bindingTarget) {
String getDestination(KStream<?,?> bindingTarget) {
BindingProperties bindingProperties = this.bindingProperties.get(bindingTarget);
return bindingProperties.getDestination();
}
@@ -67,7 +65,7 @@ public class KStreamBindingInformationCatalogue {
* @param bindingTarget KStream binding target
* @return true if native decoding is enabled, fasle otherwise.
*/
public boolean isUseNativeDecoding(KStream<?,?> bindingTarget) {
boolean isUseNativeDecoding(KStream<?,?> bindingTarget) {
BindingProperties bindingProperties = this.bindingProperties.get(bindingTarget);
if (bindingProperties.getConsumer() == null) {
bindingProperties.setConsumer(new ConsumerProperties());
@@ -81,7 +79,7 @@ public class KStreamBindingInformationCatalogue {
* @param bindingTarget KStream binding target
* @return true if DLQ is enabled, false otherwise.
*/
public boolean isDlqEnabled(KStream<?,?> bindingTarget) {
boolean isDlqEnabled(KStream<?,?> bindingTarget) {
return consumerProperties.get(bindingTarget).isEnableDlq();
}
@@ -91,7 +89,7 @@ public class KStreamBindingInformationCatalogue {
* @param bindingTarget KStream binding target
* @return content Type associated.
*/
public String getContentType(KStream<?,?> bindingTarget) {
String getContentType(KStream<?,?> bindingTarget) {
BindingProperties bindingProperties = this.bindingProperties.get(bindingTarget);
return bindingProperties.getContentType();
}
@@ -102,7 +100,7 @@ public class KStreamBindingInformationCatalogue {
* @param bindingTarget KStream binding target
* @return corresponding {@link StreamsBuilderFactoryBean}
*/
public StreamsConfig getStreamsConfig(Object bindingTarget) {
StreamsConfig getStreamsConfig(Object bindingTarget) {
return streamsConfigs.get(bindingTarget);
}
@@ -112,18 +110,18 @@ public class KStreamBindingInformationCatalogue {
* @param bindingTarget KStream binding target
* @param bindingProperties {@link BindingProperties} for this KStream
*/
public void registerBindingProperties(KStream<?,?> bindingTarget, BindingProperties bindingProperties) {
void registerBindingProperties(KStream<?,?> bindingTarget, BindingProperties bindingProperties) {
this.bindingProperties.put(bindingTarget, bindingProperties);
}
/**
* Register a cache for bounded KStream -> {@link KStreamConsumerProperties}
* Register a cache for bounded KStream -> {@link KafkaStreamsConsumerProperties}
*
* @param bindingTarget KStream binding target
* @param kStreamConsumerProperties Consumer properties for this KStream
* @param kafkaStreamsConsumerProperties Consumer properties for this KStream
*/
public void registerConsumerProperties(KStream<?,?> bindingTarget, KStreamConsumerProperties kStreamConsumerProperties) {
this.consumerProperties.put(bindingTarget, kStreamConsumerProperties);
void registerConsumerProperties(KStream<?,?> bindingTarget, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties) {
this.consumerProperties.put(bindingTarget, kafkaStreamsConsumerProperties);
}
/**
@@ -131,15 +129,15 @@ public class KStreamBindingInformationCatalogue {
*
* @param streamsBuilderFactoryBean provides the {@link StreamsBuilderFactoryBean} mapped to the KStream
*/
public void addStreamBuilderFactory(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
void addStreamBuilderFactory(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
this.streamsBuilderFactoryBeans.add(streamsBuilderFactoryBean);
}
public void addStreamsConfigs(Object bindingTarget, StreamsConfig streamsConfig) {
void addStreamsConfigs(Object bindingTarget, StreamsConfig streamsConfig) {
this.streamsConfigs.put(bindingTarget, streamsConfig);
}
public Set<StreamsBuilderFactoryBean> getStreamsBuilderFactoryBeans() {
Set<StreamsBuilderFactoryBean> getStreamsBuilderFactoryBeans() {
return streamsBuilderFactoryBeans;
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
/**
* @author Soby Chacko
@@ -40,17 +40,18 @@ import org.springframework.util.ObjectUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
class KStreamDlqDispatch {
class KafkaStreamsDlqDispatch {
private final Log logger = LogFactory.getLog(getClass());
private final KafkaTemplate<byte[],byte[]> kafkaTemplate;
private final String dlqName;
KStreamDlqDispatch(String dlqName,
KafkaStreamsDlqDispatch(String dlqName,
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaConsumerProperties kafkaConsumerProperties) {
ProducerFactory<byte[],byte[]> producerFactory = getProducerFactory(null,
ProducerFactory<byte[],byte[]> producerFactory = getProducerFactory(
new ExtendedProducerProperties<>(kafkaConsumerProperties.getDlqProducerProperties()),
kafkaBinderConfigurationProperties);
@@ -64,9 +65,9 @@ class KStreamDlqDispatch {
key, value, null);
StringBuilder sb = new StringBuilder().append(" a message with key='")
.append(toDisplayString(ObjectUtils.nullSafeToString(key), 50)).append("'")
.append(toDisplayString(ObjectUtils.nullSafeToString(key))).append("'")
.append(" and payload='")
.append(toDisplayString(ObjectUtils.nullSafeToString(value), 50))
.append(toDisplayString(ObjectUtils.nullSafeToString(value)))
.append("'").append(" received from ")
.append(partittion);
ListenableFuture<SendResult<byte[],byte[]>> sentDlq = null;
@@ -76,14 +77,14 @@ class KStreamDlqDispatch {
@Override
public void onFailure(Throwable ex) {
KStreamDlqDispatch.this.logger.error(
KafkaStreamsDlqDispatch.this.logger.error(
"Error sending to DLQ " + sb.toString(), ex);
}
@Override
public void onSuccess(SendResult<byte[],byte[]> result) {
if (KStreamDlqDispatch.this.logger.isDebugEnabled()) {
KStreamDlqDispatch.this.logger.debug(
if (KafkaStreamsDlqDispatch.this.logger.isDebugEnabled()) {
KafkaStreamsDlqDispatch.this.logger.debug(
"Sent to DLQ " + sb.toString());
}
}
@@ -91,14 +92,13 @@ class KStreamDlqDispatch {
}
catch (Exception ex) {
if (sentDlq == null) {
KStreamDlqDispatch.this.logger.error(
KafkaStreamsDlqDispatch.this.logger.error(
"Error sending to DLQ " + sb.toString(), ex);
}
}
}
private DefaultKafkaProducerFactory<byte[],byte[]> getProducerFactory(String transactionIdPrefix,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
private DefaultKafkaProducerFactory<byte[],byte[]> getProducerFactory(ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
KafkaBinderConfigurationProperties configurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.RETRIES_CONFIG, 0);
@@ -128,17 +128,14 @@ class KStreamDlqDispatch {
//Always send as byte[] on dlq (the same byte[] that the consumer received)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<byte[],byte[]> producerFactory = new DefaultKafkaProducerFactory<>(props);
if (transactionIdPrefix != null) {
producerFactory.setTransactionIdPrefix(transactionIdPrefix);
}
return producerFactory;
return new DefaultKafkaProducerFactory<>(props);
}
private String toDisplayString(String original, int maxCharacters) {
if (original.length() <= maxCharacters) {
private String toDisplayString(String original) {
if (original.length() <= 50) {
return original;
}
return original.substring(0, maxCharacters) + "...";
return original.substring(0, 50) + "...";
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.HashMap;
import java.util.Map;
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.springframework.cloud.stream.binder.kstream.config.KStreamBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@@ -40,11 +40,9 @@ import org.springframework.util.StringUtils;
* inbound messages based on a contentType. Based on the contentType, a {@link MessageConverter} will
* be resolved.
*
* @since 2.0.0
*
* @author Soby Chacko
*/
public class KStreamBoundMessageConversionDelegate {
class KafkaStreamsMessageConversionDelegate {
private static final ThreadLocal<KeyValue<Object, Object>> keyValueThreadLocal = new ThreadLocal<>();
@@ -52,14 +50,14 @@ public class KStreamBoundMessageConversionDelegate {
private final SendToDlqAndContinue sendToDlqAndContinue;
private final KStreamBindingInformationCatalogue kstreamBindingInformationCatalogue;
private final KafkaStreamsBindingInformationCatalogue kstreamBindingInformationCatalogue;
private final KStreamBinderConfigurationProperties kstreamBinderConfigurationProperties;
private final KafkaStreamsBinderConfigurationProperties kstreamBinderConfigurationProperties;
public KStreamBoundMessageConversionDelegate(CompositeMessageConverterFactory compositeMessageConverterFactory,
SendToDlqAndContinue sendToDlqAndContinue,
KStreamBindingInformationCatalogue kstreamBindingInformationCatalogue,
KStreamBinderConfigurationProperties kstreamBinderConfigurationProperties) {
KafkaStreamsMessageConversionDelegate(CompositeMessageConverterFactory compositeMessageConverterFactory,
SendToDlqAndContinue sendToDlqAndContinue,
KafkaStreamsBindingInformationCatalogue kstreamBindingInformationCatalogue,
KafkaStreamsBinderConfigurationProperties kstreamBinderConfigurationProperties) {
this.compositeMessageConverterFactory = compositeMessageConverterFactory;
this.sendToDlqAndContinue = sendToDlqAndContinue;
this.kstreamBindingInformationCatalogue = kstreamBindingInformationCatalogue;
@@ -103,7 +101,9 @@ public class KStreamBoundMessageConversionDelegate {
public KStream deserializeOnInbound(Class<?> valueClass, KStream<?, ?> bindingTarget) {
MessageConverter messageConverter = compositeMessageConverterFactory.getMessageConverterForAllRegistered();
//Deserialize using a branching strategy
KStream<?, ?>[] branch = bindingTarget.branch(
//First filter where the message is converted and return true if everything went well, return false otherwise.
(o, o2) -> {
boolean isValidRecord = false;
@@ -129,15 +129,17 @@ public class KStreamBoundMessageConversionDelegate {
isValidRecord = true;
}
catch (Exception ignored) {
System.out.println();
//pass through
}
return isValidRecord;
},
//sedond filter that catches any messages for which an exception thrown in the first filter above.
(k, v) -> true
);
//process errors from the second filter in the branch above.
processErrorFromDeserialization(bindingTarget, branch[1]);
//first branch above is the branch where the messages are converted, let it go through further processing.
return branch[0].map((o, o2) -> {
KeyValue<Object, Object> objectObjectKeyValue = keyValueThreadLocal.get();
keyValueThreadLocal.remove();
@@ -175,14 +177,15 @@ public class KStreamBoundMessageConversionDelegate {
sendToDlqAndContinue.sendToDlq(destination, (byte[]) o, (byte[]) o2, context.partition());
}
}
else if (kstreamBinderConfigurationProperties.getSerdeError() == KStreamBinderConfigurationProperties.SerdeError.logAndFail) {
else if (kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndFail) {
throw new IllegalStateException("Inbound deserialization failed.");
}
else if (kstreamBinderConfigurationProperties.getSerdeError() == KStreamBinderConfigurationProperties.SerdeError.logAndContinue) {
else if (kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndContinue) {
//quietly pass through. No action needed, this is similar to log and continue.
}
}
@SuppressWarnings("deprecation")
@Override
public void punctuate(long timestamp) {

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.lang.reflect.Method;
import java.util.Collection;
@@ -45,9 +45,9 @@ import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.kstream.config.KStreamBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kstream.config.KStreamConsumerProperties;
import org.springframework.cloud.stream.binder.kstream.config.KStreamExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binding.StreamListenerErrorMessages;
import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
@@ -72,36 +72,47 @@ import org.springframework.util.StringUtils;
* Kafka Streams specific implementation for {@link StreamListenerSetupMethodOrchestrator}
* that overrides the default mechanisms for invoking StreamListener adapters.
*
* @since 2.0.0
* The orchestration primarily focus on the following areas:
*
* 1. Allow multiple KStream output bindings (KStream branching) by allowing more than one output values on {@link SendTo}
* 2. Allow multiple inbound bindings for multiple KStream and or KTable types.
* 3. Each StreamListener method that it orchestrates gets its own {@link StreamsBuilderFactoryBean} and {@link StreamsConfig}
*
* @author Soby Chacko
*/
public class KStreamListenerSetupMethodOrchestrator implements StreamListenerSetupMethodOrchestrator, ApplicationContextAware {
class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListenerSetupMethodOrchestrator, ApplicationContextAware {
private final static Log LOG = LogFactory.getLog(KStreamListenerSetupMethodOrchestrator.class);
private final static Log LOG = LogFactory.getLog(KafkaStreamsStreamListenerSetupMethodOrchestrator.class);
private final StreamListenerParameterAdapter streamListenerParameterAdapter;
private final Collection<StreamListenerResultAdapter> streamListenerResultAdapters;
private final BindingServiceProperties bindingServiceProperties;
private final KStreamExtendedBindingProperties kStreamExtendedBindingProperties;
private final KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
private final KeyValueSerdeResolver keyValueSerdeResolver;
private final KStreamBindingInformationCatalogue kStreamBindingInformationCatalogue;
private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
private final Map<Method, StreamsBuilderFactoryBean> methodStreamsBuilderFactoryBeanMap = new HashMap<>();
private final KStreamBinderConfigurationProperties binderConfigurationProperties;
private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
private ConfigurableApplicationContext applicationContext;
public KStreamListenerSetupMethodOrchestrator(BindingServiceProperties bindingServiceProperties,
KStreamExtendedBindingProperties kStreamExtendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver,
KStreamBindingInformationCatalogue kStreamBindingInformationCatalogue,
StreamListenerParameterAdapter streamListenerParameterAdapter,
Collection<StreamListenerResultAdapter> streamListenerResultAdapters,
KStreamBinderConfigurationProperties binderConfigurationProperties) {
KafkaStreamsStreamListenerSetupMethodOrchestrator(BindingServiceProperties bindingServiceProperties,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
StreamListenerParameterAdapter streamListenerParameterAdapter,
Collection<StreamListenerResultAdapter> streamListenerResultAdapters,
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
this.bindingServiceProperties = bindingServiceProperties;
this.kStreamExtendedBindingProperties = kStreamExtendedBindingProperties;
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
this.keyValueSerdeResolver = keyValueSerdeResolver;
this.kStreamBindingInformationCatalogue = kStreamBindingInformationCatalogue;
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
this.streamListenerParameterAdapter = streamListenerParameterAdapter;
this.streamListenerResultAdapters = streamListenerResultAdapters;
this.binderConfigurationProperties = binderConfigurationProperties;
@@ -109,7 +120,7 @@ public class KStreamListenerSetupMethodOrchestrator implements StreamListenerSet
@Override
public boolean supports(Method method) {
return methodParameterSuppports(method) &&
return methodParameterSupports(method) &&
(methodReturnTypeSuppports(method) || Void.TYPE.equals(method.getReturnType()));
}
@@ -122,7 +133,7 @@ public class KStreamListenerSetupMethodOrchestrator implements StreamListenerSet
return false;
}
private boolean methodParameterSuppports(Method method) {
private boolean methodParameterSupports(Method method) {
boolean supports = false;
for (int i = 0; i < method.getParameterCount(); i++) {
MethodParameter methodParameter = MethodParameter.forExecutable(method, i);
@@ -209,24 +220,26 @@ public class KStreamListenerSetupMethodOrchestrator implements StreamListenerSet
BindingProperties bindingProperties = bindingServiceProperties.getBindingProperties(inboundName);
enableNativeDecodingForKTableAlways(parameterType, bindingProperties);
StreamsConfig streamsConfig = null;
//Retrieve the StreamsConfig created for this method if available.
//Otherwise, carete the StreamsBuilderFactory and get the underlying config.
if (!methodStreamsBuilderFactoryBeanMap.containsKey(method)) {
streamsConfig = buildStreamsBuilderAndRetrieveConfig(method, applicationContext, bindingProperties);
}
try {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = methodStreamsBuilderFactoryBeanMap.get(method);
StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
KStreamConsumerProperties extendedConsumerProperties = kStreamExtendedBindingProperties.getExtendedConsumerProperties(inboundName);
KafkaStreamsConsumerProperties extendedConsumerProperties = kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(inboundName);
Serde<?> keySerde = this.keyValueSerdeResolver.getInboundKeySerde(extendedConsumerProperties);
Serde<?> valueSerde = this.keyValueSerdeResolver.getInboundValueSerde(bindingProperties.getConsumer(), extendedConsumerProperties);
if (parameterType.isAssignableFrom(KStream.class)) {
KStream<?, ?> stream = getkStream(inboundName, bindingProperties, streamsBuilder, keySerde, valueSerde);
KStreamBoundElementFactory.KStreamWrapper kStreamWrapper = (KStreamBoundElementFactory.KStreamWrapper) targetBean;
//wrap the proxy created during the initial target type binding with real object (KStream)
kStreamWrapper.wrap((KStream<Object, Object>) stream);
kStreamBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
if (streamsConfig != null){
kStreamBindingInformationCatalogue.addStreamsConfigs(kStreamWrapper, streamsConfig);
kafkaStreamsBindingInformationCatalogue.addStreamsConfigs(kStreamWrapper, streamsConfig);
}
// Iterate existing parameter adapters first
for (StreamListenerParameterAdapter streamListenerParameterAdapter : streamListenerParameterAdapters) {
if (streamListenerParameterAdapter.supports(stream.getClass(), methodParameter)) {
arguments[parameterIndex] = streamListenerParameterAdapter.adapt(kStreamWrapper, methodParameter);
@@ -247,10 +260,11 @@ public class KStreamListenerSetupMethodOrchestrator implements StreamListenerSet
streamsBuilder.table(bindingDestination,
Consumed.with(keySerde, valueSerde));
KTableBoundElementFactory.KTableWrapper kTableWrapper = (KTableBoundElementFactory.KTableWrapper) targetBean;
//wrap the proxy created during the initial target type binding with real object (KTable)
kTableWrapper.wrap((KTable<Object, Object>) table);
kStreamBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
if (streamsConfig != null){
kStreamBindingInformationCatalogue.addStreamsConfigs(kTableWrapper, streamsConfig);
kafkaStreamsBindingInformationCatalogue.addStreamsConfigs(kTableWrapper, streamsConfig);
}
arguments[parameterIndex] = table;
}
@@ -327,6 +341,8 @@ public class KStreamListenerSetupMethodOrchestrator implements StreamListenerSet
}
Map<String, Object> streamConfigGlobalProperties = applicationContext.getBean("streamConfigGlobalProperties", Map.class);
streamConfigGlobalProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, group);
//Custom StreamsConfig implementation that overrides to guarantee that the deserialization handler is cached.
StreamsConfig streamsConfig = new StreamsConfig(streamConfigGlobalProperties) {
DeserializationExceptionHandler deserializationExceptionHandler;
@Override

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Map;
@@ -24,9 +24,9 @@ import org.apache.kafka.common.utils.Utils;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.kstream.config.KStreamBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kstream.config.KStreamConsumerProperties;
import org.springframework.cloud.stream.binder.kstream.config.KStreamProducerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.util.StringUtils;
/**
@@ -43,18 +43,16 @@ import org.springframework.util.StringUtils;
* If native encoding is disabled, then the binder will do serialization using a contentType. Keys are always serialized
* by the broker.
*
* @since 2.0.0
*
* @author Soby Chacko
*/
public class KeyValueSerdeResolver {
class KeyValueSerdeResolver {
private final Map<String,Object> streamConfigGlobalProperties;
private final KStreamBinderConfigurationProperties binderConfigurationProperties;
private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
public KeyValueSerdeResolver(Map<String,Object> streamConfigGlobalProperties,
KStreamBinderConfigurationProperties binderConfigurationProperties) {
KeyValueSerdeResolver(Map<String,Object> streamConfigGlobalProperties,
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
this.streamConfigGlobalProperties = streamConfigGlobalProperties;
this.binderConfigurationProperties = binderConfigurationProperties;
}
@@ -62,10 +60,10 @@ public class KeyValueSerdeResolver {
/**
* Provide the {@link Serde} for inbound key
*
* @param extendedConsumerProperties binding level extended {@link KStreamConsumerProperties}
* @param extendedConsumerProperties binding level extended {@link KafkaStreamsConsumerProperties}
* @return configurd {@link Serde} for the inbound key.
*/
public Serde<?> getInboundKeySerde(KStreamConsumerProperties extendedConsumerProperties) {
public Serde<?> getInboundKeySerde(KafkaStreamsConsumerProperties extendedConsumerProperties) {
String keySerdeString = extendedConsumerProperties.getKeySerde();
return getKeySerde(keySerdeString);
@@ -75,10 +73,10 @@ public class KeyValueSerdeResolver {
* Provide the {@link Serde} for inbound value
*
* @param consumerProperties {@link ConsumerProperties} on binding
* @param extendedConsumerProperties binding level extended {@link KStreamConsumerProperties}
* @param extendedConsumerProperties binding level extended {@link KafkaStreamsConsumerProperties}
* @return configurd {@link Serde} for the inbound value.
*/
public Serde<?> getInboundValueSerde(ConsumerProperties consumerProperties, KStreamConsumerProperties extendedConsumerProperties) {
public Serde<?> getInboundValueSerde(ConsumerProperties consumerProperties, KafkaStreamsConsumerProperties extendedConsumerProperties) {
Serde<?> valueSerde;
String valueSerdeString = extendedConsumerProperties.getValueSerde();
@@ -101,10 +99,10 @@ public class KeyValueSerdeResolver {
/**
* Provide the {@link Serde} for outbound key
*
* @param properties binding level extended {@link KStreamProducerProperties}
* @param properties binding level extended {@link KafkaStreamsProducerProperties}
* @return configurd {@link Serde} for the outbound key.
*/
public Serde<?> getOuboundKeySerde(KStreamProducerProperties properties) {
public Serde<?> getOuboundKeySerde(KafkaStreamsProducerProperties properties) {
return getKeySerde(properties.getKeySerde());
}
@@ -112,14 +110,14 @@ public class KeyValueSerdeResolver {
* Provide the {@link Serde} for outbound value
*
* @param producerProperties {@link ProducerProperties} on binding
* @param kStreamProducerProperties binding level extended {@link KStreamProducerProperties}
* @param kafkaStreamsProducerProperties binding level extended {@link KafkaStreamsProducerProperties}
* @return configurd {@link Serde} for the outbound value.
*/
public Serde<?> getOutboundValueSerde(ProducerProperties producerProperties, KStreamProducerProperties kStreamProducerProperties) {
public Serde<?> getOutboundValueSerde(ProducerProperties producerProperties, KafkaStreamsProducerProperties kafkaStreamsProducerProperties) {
Serde<?> valueSerde;
try {
if (producerProperties.isUseNativeEncoding()) {
valueSerde = getValueSerde(kStreamProducerProperties.getValueSerde());
valueSerde = getValueSerde(kafkaStreamsProducerProperties.getValueSerde());
}
else {
valueSerde = Serdes.ByteArray();

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.HashSet;
import java.util.Set;
@@ -26,8 +26,8 @@ import org.apache.kafka.streams.state.QueryableStoreType;
* Registry that contains {@link QueryableStoreType}s those created from
* the user applications.
*
* @since 2.0.0
* @author Soby Chacko
* @since 2.0.0
*/
public class QueryableStoreRegistry {
@@ -38,7 +38,7 @@ public class QueryableStoreRegistry {
*
* @param storeName name of the queryable store
* @param storeType type of the queryable store
* @param <T> generic queryable store
* @param <T> generic queryable store
* @return queryable store.
*/
public <T> T getQueryableStoreType(String storeName, QueryableStoreType<T> storeType) {
@@ -57,7 +57,7 @@ public class QueryableStoreRegistry {
*
* @param kafkaStreams {@link KafkaStreams} object created in the application
*/
public void registerKafkaStreams(KafkaStreams kafkaStreams) {
void registerKafkaStreams(KafkaStreams kafkaStreams) {
this.kafkaStreams.add(kafkaStreams);
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.lang.reflect.Field;
import java.util.HashMap;
@@ -41,17 +41,30 @@ import org.springframework.util.ReflectionUtils;
*/
public class SendToDlqAndContinue implements DeserializationExceptionHandler{
private Map<String, KStreamDlqDispatch> dlqDispatchers = new HashMap<>();
/**
* DLQ dispatcher per topic in the application context. The key here is not the actual DLQ topic
* but the incoming topic that caused the error.
*/
private Map<String, KafkaStreamsDlqDispatch> dlqDispatchers = new HashMap<>();
public void sendToDlq(String topic, byte[] key, byte[] value, int partittion){
KStreamDlqDispatch kStreamDlqDispatch = dlqDispatchers.get(topic);
kStreamDlqDispatch.sendToDlq(key,value, partittion);
/**
* For a given topic, send the key/value record to DLQ topic.
*
* @param topic incoming topic that caused the error
* @param key to send
* @param value to send
* @param partition for the topic where this record should be sent
*/
public void sendToDlq(String topic, byte[] key, byte[] value, int partition){
KafkaStreamsDlqDispatch kafkaStreamsDlqDispatch = dlqDispatchers.get(topic);
kafkaStreamsDlqDispatch.sendToDlq(key,value, partition);
}
@Override
@SuppressWarnings("unchecked")
public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record, Exception exception) {
KStreamDlqDispatch kStreamDlqDispatch = dlqDispatchers.get(record.topic());
kStreamDlqDispatch.sendToDlq(record.key(), record.value(), record.partition());
KafkaStreamsDlqDispatch kafkaStreamsDlqDispatch = dlqDispatchers.get(record.topic());
kafkaStreamsDlqDispatch.sendToDlq(record.key(), record.value(), record.partition());
context.commit();
// The following conditional block should be reconsidered when we have a solution for this SO problem:
@@ -89,7 +102,7 @@ public class SendToDlqAndContinue implements DeserializationExceptionHandler{
}
public void addKStreamDlqDispatch(String topic, KStreamDlqDispatch kStreamDlqDispatch){
dlqDispatchers.put(topic, kStreamDlqDispatch);
void addKStreamDlqDispatch(String topic, KafkaStreamsDlqDispatch kafkaStreamsDlqDispatch){
dlqDispatchers.put(topic, kafkaStreamsDlqDispatch);
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Set;
@@ -23,17 +23,28 @@ import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
/**
* Iterate through all {@link StreamsBuilderFactoryBean} in the application context
* and start them. As each one completes starting, register the associated KafkaStreams
* object into {@link QueryableStoreRegistry}.
*
* This {@link SmartLifecycle} class ensures that the bean created from it is started very late
* through the bootstrap process by setting the phase value closer to Integer.MAX_VALUE.
* This is to guarantee that the {@link StreamsBuilderFactoryBean} on a
* {@link org.springframework.cloud.stream.annotation.StreamListener} method with multiple
* bindings is only started after all the binding phases have completed successfully.
*
* @author Soby Chacko
*/
public class StreamsBuildersLifecycle implements SmartLifecycle {
class StreamsBuilderFactoryManager implements SmartLifecycle {
private final KStreamBindingInformationCatalogue kStreamBindingInformationCatalogue;
private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
private final QueryableStoreRegistry queryableStoreRegistry;
private volatile boolean running;
public StreamsBuildersLifecycle(KStreamBindingInformationCatalogue kStreamBindingInformationCatalogue, QueryableStoreRegistry queryableStoreRegistry) {
this.kStreamBindingInformationCatalogue = kStreamBindingInformationCatalogue;
StreamsBuilderFactoryManager(KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
QueryableStoreRegistry queryableStoreRegistry) {
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
this.queryableStoreRegistry = queryableStoreRegistry;
}
@@ -54,7 +65,7 @@ public class StreamsBuildersLifecycle implements SmartLifecycle {
public synchronized void start() {
if (!this.running) {
try {
Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = this.kStreamBindingInformationCatalogue.getStreamsBuilderFactoryBeans();
Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeans();
for (StreamsBuilderFactoryBean streamsBuilderFactoryBean : streamsBuilderFactoryBeans) {
streamsBuilderFactoryBean.start();
queryableStoreRegistry.registerKafkaStreams(streamsBuilderFactoryBean.getKafkaStreams());
@@ -70,7 +81,7 @@ public class StreamsBuildersLifecycle implements SmartLifecycle {
public synchronized void stop() {
if (this.running) {
try {
Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = this.kStreamBindingInformationCatalogue.getStreamsBuilderFactoryBeans();
Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeans();
for (StreamsBuilderFactoryBean streamsBuilderFactoryBean : streamsBuilderFactoryBeans) {
streamsBuilderFactoryBean.stop();
}

View File

@@ -0,0 +1,85 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.annotations;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
/**
* Bindable interface for {@link KStream} input and output.
*
* This interface can be used as a bindable interface with {@link org.springframework.cloud.stream.annotation.EnableBinding}
* when both input and output types are single KStream. In other scenarios where multiple types are required, other
* similar bindable interfaces can be created and used. For example, there are cases in which multiple KStreams
* are required on the outbound in the case of KStream branching or multiple input types are required either in the
* form of multiple KStreams and a combination of KStreams and KTables. In those cases, new bindable interfaces compatible
* with the requirements must be created. Here are some examples.
*
* <pre class="code">
* interface KStreamBranchProcessor {
* &#064;Input("input")
* KStream<?, ?> input();
*
* &#064;Output("output-1")
* KStream<?, ?> output1();
*
* &#064;Output("output-2")
* KStream<?, ?> output2();
*
* &#064;Output("output-3")
* KStream<?, ?> output3();
*
* ......
*
* }
*</pre>
*
* <pre class="code">
* interface KStreamKtableProcessor {
* &#064;Input("input-1")
* KStream<?, ?> input1();
*
* &#064;Input("input-2")
* KTable<?, ?> input2();
*
* &#064;Output("output")
* KStream<?, ?> output();
*
* ......
*
* }
*</pre>
*
* @author Marius Bogoevici
* @author Soby Chacko
*/
public interface KafkaStreamsProcessor {
/**
* @return {@link Input} binding for {@link KStream} type.
*/
@Input("input")
KStream<?, ?> input();
/**
* @return {@link Output} binding for {@link KStream} type.
*/
@Output("output")
KStream<?, ?> output();
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream.config;
package org.springframework.cloud.stream.binder.kafka.streams.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -26,8 +26,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
*
* @author Soby Chacko
*/
@ConfigurationProperties("spring.cloud.stream.kstream")
public class KStreamApplicationSupportProperties {
@ConfigurationProperties("spring.cloud.stream.kafka.streams")
public class KafkaStreamsApplicationSupportProperties {
private TimeWindow timeWindow;

View File

@@ -14,14 +14,14 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream.config;
package org.springframework.cloud.stream.binder.kafka.streams.properties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
/**
* @author Soby Chacko
*/
public class KStreamBinderConfigurationProperties extends KafkaBinderConfigurationProperties {
public class KafkaStreamsBinderConfigurationProperties extends KafkaBinderConfigurationProperties {
public enum SerdeError {
logAndContinue,
@@ -41,16 +41,16 @@ public class KStreamBinderConfigurationProperties extends KafkaBinderConfigurati
/**
* {@link org.apache.kafka.streams.errors.DeserializationExceptionHandler} to use
* when there is a Serde error. {@link KStreamBinderConfigurationProperties.SerdeError}
* when there is a Serde error. {@link KafkaStreamsBinderConfigurationProperties.SerdeError}
* values are used to provide the exception handler on consumer binding.
*/
private KStreamBinderConfigurationProperties.SerdeError serdeError;
private KafkaStreamsBinderConfigurationProperties.SerdeError serdeError;
public KStreamBinderConfigurationProperties.SerdeError getSerdeError() {
public KafkaStreamsBinderConfigurationProperties.SerdeError getSerdeError() {
return serdeError;
}
public void setSerdeError(KStreamBinderConfigurationProperties.SerdeError serdeError) {
public void setSerdeError(KafkaStreamsBinderConfigurationProperties.SerdeError serdeError) {
this.serdeError = serdeError;
}

View File

@@ -14,30 +14,30 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream.config;
package org.springframework.cloud.stream.binder.kafka.streams.properties;
/**
* @author Marius Bogoevici
*/
public class KStreamBindingProperties {
public class KafkaStreamsBindingProperties {
private KStreamConsumerProperties consumer = new KStreamConsumerProperties();
private KafkaStreamsConsumerProperties consumer = new KafkaStreamsConsumerProperties();
private KStreamProducerProperties producer = new KStreamProducerProperties();
private KafkaStreamsProducerProperties producer = new KafkaStreamsProducerProperties();
public KStreamConsumerProperties getConsumer() {
public KafkaStreamsConsumerProperties getConsumer() {
return consumer;
}
public void setConsumer(KStreamConsumerProperties consumer) {
public void setConsumer(KafkaStreamsConsumerProperties consumer) {
this.consumer = consumer;
}
public KStreamProducerProperties getProducer() {
public KafkaStreamsProducerProperties getProducer() {
return producer;
}
public void setProducer(KStreamProducerProperties producer) {
public void setProducer(KafkaStreamsProducerProperties producer) {
this.producer = producer;
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream.config;
package org.springframework.cloud.stream.binder.kafka.streams.properties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
@@ -22,7 +22,7 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerPro
* @author Marius Bogoevici
* @author Soby Chacko
*/
public class KStreamConsumerProperties extends KafkaConsumerProperties {
public class KafkaStreamsConsumerProperties extends KafkaConsumerProperties {
/**
* Key serde specified per binding.

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream.config;
package org.springframework.cloud.stream.binder.kafka.streams.properties;
import java.util.HashMap;
import java.util.Map;
@@ -25,37 +25,37 @@ import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
/**
* @author Marius Bogoevici
*/
@ConfigurationProperties("spring.cloud.stream.kstream")
public class KStreamExtendedBindingProperties
implements ExtendedBindingProperties<KStreamConsumerProperties, KStreamProducerProperties> {
@ConfigurationProperties("spring.cloud.stream.kafka.streams")
public class KafkaStreamsExtendedBindingProperties
implements ExtendedBindingProperties<KafkaStreamsConsumerProperties, KafkaStreamsProducerProperties> {
private Map<String, KStreamBindingProperties> bindings = new HashMap<>();
private Map<String, KafkaStreamsBindingProperties> bindings = new HashMap<>();
public Map<String, KStreamBindingProperties> getBindings() {
public Map<String, KafkaStreamsBindingProperties> getBindings() {
return this.bindings;
}
public void setBindings(Map<String, KStreamBindingProperties> bindings) {
public void setBindings(Map<String, KafkaStreamsBindingProperties> bindings) {
this.bindings = bindings;
}
@Override
public KStreamConsumerProperties getExtendedConsumerProperties(String binding) {
public KafkaStreamsConsumerProperties getExtendedConsumerProperties(String binding) {
if (this.bindings.containsKey(binding) && this.bindings.get(binding).getConsumer() != null) {
return this.bindings.get(binding).getConsumer();
}
else {
return new KStreamConsumerProperties();
return new KafkaStreamsConsumerProperties();
}
}
@Override
public KStreamProducerProperties getExtendedProducerProperties(String binding) {
public KafkaStreamsProducerProperties getExtendedProducerProperties(String binding) {
if (this.bindings.containsKey(binding) && this.bindings.get(binding).getProducer() != null) {
return this.bindings.get(binding).getProducer();
}
else {
return new KStreamProducerProperties();
return new KafkaStreamsProducerProperties();
}
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream.config;
package org.springframework.cloud.stream.binder.kafka.streams.properties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
@@ -22,7 +22,7 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerPro
* @author Marius Bogoevici
* @author Soby Chacko
*/
public class KStreamProducerProperties extends KafkaProducerProperties {
public class KafkaStreamsProducerProperties extends KafkaProducerProperties {
/**
* Key serde specified per binding.

View File

@@ -0,0 +1,6 @@
kstream:\
org.springframework.cloud.stream.binder.kafka.streams.KStreamBinderConfiguration
ktable:\
org.springframework.cloud.stream.binder.kafka.streams.KTableBinderConfiguration

View File

@@ -0,0 +1,5 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration,\
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsApplicationSupportAutoConfiguration

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Arrays;
import java.util.Map;
@@ -25,6 +25,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -39,8 +41,8 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.cloud.stream.binder.kstream.config.KStreamApplicationSupportProperties;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -69,14 +71,14 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts", "error.words.group");
@SpyBean
KStreamBoundMessageConversionDelegate KStreamBoundMessageConversionDelegate;
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
private static Consumer<String, String> consumer;
@BeforeClass
public static void setUp() throws Exception {
System.setProperty("spring.cloud.stream.kstream.binder.brokers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kstream.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kafka.streams.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
System.setProperty("server.port","0");
System.setProperty("spring.jmx.enabled","false");
@@ -97,8 +99,8 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
"spring.cloud.stream.bindings.input.consumer.useNativeDecoding=true",
"spring.cloud.stream.bindings.output.producer.useNativeEncoding=true",
"spring.cloud.stream.bindings.input.group=group",
"spring.cloud.stream.kstream.binder.serdeError=sendToDlq",
"spring.cloud.stream.kstream.binder.configuration.default.value.serde=" +
"spring.cloud.stream.kafka.streams.binder.serdeError=sendToDlq",
"spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=" +
"org.apache.kafka.common.serialization.Serdes$IntegerSerde"},
webEnvironment= SpringBootTest.WebEnvironment.NONE
)
@@ -123,17 +125,17 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
assertThat(cr.value().equals("foobar")).isTrue();
//Ensuring that the deserialization was indeed done by Kafka natively
verify(KStreamBoundMessageConversionDelegate, never()).deserializeOnInbound(any(Class.class), any(KStream.class));
verify(KStreamBoundMessageConversionDelegate, never()).serializeOnOutbound(any(KStream.class));
verify(KafkaStreamsMessageConversionDelegate, never()).deserializeOnInbound(any(Class.class), any(KStream.class));
verify(KafkaStreamsMessageConversionDelegate, never()).serializeOnOutbound(any(KStream.class));
}
}
@EnableBinding(KStreamProcessor.class)
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
@PropertySource("classpath:/org/springframework/cloud/stream/binder/kstream/integTest-1.properties")
@EnableConfigurationProperties(KStreamApplicationSupportProperties.class)
static class WordCountProcessorApplication {
@EnableConfigurationProperties(KafkaStreamsApplicationSupportProperties.class)
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@@ -145,8 +147,9 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serdes.String(), Serdes.String())
.count(timeWindows, "foo-WordCounts-x")
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(timeWindows)
.count(Materialized.as("foo-WordCounts-x"))
.toStream()
.map((key, value) -> new KeyValue<>(null, "Count for " + key.key() + " : " + value));
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Map;
@@ -23,6 +23,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -35,7 +37,7 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
@@ -63,14 +65,14 @@ public abstract class DeserializtionErrorHandlerByBinderTests {
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts-id", "error.foos.foobar-group");
@SpyBean
KStreamBoundMessageConversionDelegate KStreamBoundMessageConversionDelegate;
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
private static Consumer<Integer, String> consumer;
@BeforeClass
public static void setUp() throws Exception {
System.setProperty("spring.cloud.stream.kstream.binder.brokers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kstream.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kafka.streams.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
System.setProperty("server.port","0");
System.setProperty("spring.jmx.enabled","false");
@@ -91,13 +93,13 @@ public abstract class DeserializtionErrorHandlerByBinderTests {
@SpringBootTest(properties = {
"spring.cloud.stream.bindings.input.destination=foos",
"spring.cloud.stream.bindings.output.destination=counts-id",
"spring.cloud.stream.kstream.binder.configuration.commit.interval.ms=1000",
"spring.cloud.stream.kstream.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"spring.cloud.stream.kstream.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"spring.cloud.stream.bindings.output.producer.headerMode=raw",
"spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
"spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
"spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"spring.cloud.stream.kstream.binder.serdeError=sendToDlq",
"spring.cloud.stream.kafka.streams.binder.serdeError=sendToDlq",
"spring.cloud.stream.bindings.input.group=foobar-group"},
webEnvironment= SpringBootTest.WebEnvironment.NONE
)
@@ -122,11 +124,11 @@ public abstract class DeserializtionErrorHandlerByBinderTests {
assertThat(cr.value().equals("hello")).isTrue();
//Ensuring that the deserialization was indeed done by the binder
verify(KStreamBoundMessageConversionDelegate).deserializeOnInbound(any(Class.class), any(KStream.class));
verify(KafkaStreamsMessageConversionDelegate).deserializeOnInbound(any(Class.class), any(KStream.class));
}
}
@EnableBinding(KStreamProcessor.class)
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
public static class ProductCountApplication {
@@ -136,8 +138,9 @@ public abstract class DeserializtionErrorHandlerByBinderTests {
return input
.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
.count(TimeWindows.of(5000), "id-count-store-x")
.groupByKey(Serialized.with(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class)))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("id-count-store-x"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key().id, value));
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Map;
@@ -24,6 +24,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -31,10 +33,11 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -50,7 +53,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Soby Chacko
* @author Gary Russell
*/
public class KStreamBinderPojoInputAndPrimitiveTypeOutputTests {
public class KafkaStreamsBinderPojoInputAndPrimitiveTypeOutputTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts-id");
@@ -75,19 +78,19 @@ public class KStreamBinderPojoInputAndPrimitiveTypeOutputTests {
@Test
public void testKstreamBinderWithPojoInputAndStringOuput() throws Exception {
SpringApplication app = new SpringApplication(ProductCountApplication.class);
app.setWebEnvironment(false);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=foos",
"--spring.cloud.stream.bindings.output.destination=counts-id",
"--spring.cloud.stream.kstream.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kstream.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kstream.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
"--spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kstream.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kstream.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {
receiveAndValidateFoo(context);
} finally {
@@ -109,7 +112,7 @@ public class KStreamBinderPojoInputAndPrimitiveTypeOutputTests {
assertThat(aLong.equals(1L));
}
@EnableBinding(KStreamProcessor.class)
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
public static class ProductCountApplication {
@@ -119,8 +122,9 @@ public class KStreamBinderPojoInputAndPrimitiveTypeOutputTests {
return input
.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
.count(TimeWindows.of(5000), "id-count-store-x")
.groupByKey(Serialized.with(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class)))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("id-count-store-x"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key().id, value));
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Arrays;
import java.util.Date;
@@ -26,6 +26,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -34,13 +36,14 @@ import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.cloud.stream.binder.kstream.config.KStreamApplicationSupportProperties;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -56,7 +59,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Soby Chacko
* @author Gary Russell
*/
public class KStreamBinderWordCountIntegrationTests {
public class KafkaStreamsBinderWordCountIntegrationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts");
@@ -80,22 +83,22 @@ public class KStreamBinderWordCountIntegrationTests {
@Test
public void testKstreamWordCountWithStringInputAndPojoOuput() throws Exception {
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
app.setWebEnvironment(false);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=words",
"--spring.cloud.stream.bindings.output.destination=counts",
"--spring.cloud.stream.bindings.output.contentType=application/json",
"--spring.cloud.stream.kstream.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kstream.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kstream.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kstream.timeWindow.length=5000",
"--spring.cloud.stream.kstream.timeWindow.advanceBy=0",
"--spring.cloud.stream.kstream.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kstream.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {
receiveAndValidate(context);
} finally {
@@ -113,9 +116,9 @@ public class KStreamBinderWordCountIntegrationTests {
assertThat(cr.value().contains("\"word\":\"foobar\",\"count\":1")).isTrue();
}
@EnableBinding(KStreamProcessor.class)
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
@EnableConfigurationProperties(KStreamApplicationSupportProperties.class)
@EnableConfigurationProperties(KafkaStreamsApplicationSupportProperties.class)
public static class WordCountProcessorApplication {
@Autowired
@@ -133,8 +136,9 @@ public class KStreamBinderWordCountIntegrationTests {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serdes.String(), Serdes.String())
.count(timeWindows, "foo-WordCounts")
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(timeWindows)
.count(Materialized.as("foo-WordCounts"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Map;
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.junit.AfterClass;
@@ -33,10 +34,11 @@ import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@@ -53,7 +55,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Soby Chacko
* @author Gary Russell
*/
public class KStreamInteractiveQueryIntegrationTests {
public class KafkaStreamsInteractiveQueryIntegrationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts-id");
@@ -77,18 +79,18 @@ public class KStreamInteractiveQueryIntegrationTests {
@Test
public void testKstreamBinderWithPojoInputAndStringOuput() throws Exception {
SpringApplication app = new SpringApplication(ProductCountApplication.class);
app.setWebEnvironment(false);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=foos",
"--spring.cloud.stream.bindings.output.destination=counts-id",
"--spring.cloud.stream.kstream.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kstream.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kstream.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kstream.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kstream.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {
receiveAndValidateFoo(context);
} finally {
@@ -109,7 +111,7 @@ public class KStreamInteractiveQueryIntegrationTests {
assertThat(foo.getProductStock(123).equals(1L));
}
@EnableBinding(KStreamProcessor.class)
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
public static class ProductCountApplication {
@@ -118,12 +120,13 @@ public class KStreamInteractiveQueryIntegrationTests {
@StreamListener("input")
@SendTo("output")
@SuppressWarnings("deprecation")
public KStream<?, String> process(KStream<Object, Product> input) {
return input
.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value.id, value))
.groupByKey(new Serdes.IntegerSerde(), new JsonSerde<>(Product.class))
.groupByKey(Serialized.with(new Serdes.IntegerSerde(), new JsonSerde<>(Product.class)))
.count("prod-id-count-store")
.toStream()
.map((key, value) -> new KeyValue<>(null, "Count for product with ID 123: " + value));
@@ -142,15 +145,11 @@ public class KStreamInteractiveQueryIntegrationTests {
}
public Long getProductStock(Integer id) {
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
queryableStoreRegistry.getQueryableStoreType("prod-id-count-store", QueryableStoreTypes.keyValueStore());
return (Long) keyValueStore.get(id);
}
}
}
static class Product {

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Arrays;
import java.util.Map;
@@ -25,6 +25,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -39,8 +41,8 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.cloud.stream.binder.kstream.config.KStreamApplicationSupportProperties;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -63,20 +65,20 @@ import static org.mockito.Mockito.verify;
@RunWith(SpringRunner.class)
@ContextConfiguration
@DirtiesContext
public abstract class KStreamsNativeEncodingDecodingTests {
public abstract class KafkaStreamsNativeEncodingDecodingTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts");
@SpyBean
KStreamBoundMessageConversionDelegate KStreamBoundMessageConversionDelegate;
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
private static Consumer<String, String> consumer;
@BeforeClass
public static void setUp() throws Exception {
System.setProperty("spring.cloud.stream.kstream.binder.brokers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kstream.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kafka.streams.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
System.setProperty("server.port","0");
System.setProperty("spring.jmx.enabled","false");
@@ -98,7 +100,7 @@ public abstract class KStreamsNativeEncodingDecodingTests {
"spring.cloud.stream.bindings.output.producer.useNativeEncoding=true"},
webEnvironment= SpringBootTest.WebEnvironment.NONE
)
public static class NativeEncodingDecodingEnabledTests extends KStreamsNativeEncodingDecodingTests {
public static class NativeEncodingDecodingEnabledTests extends KafkaStreamsNativeEncodingDecodingTests {
@Test
public void test() throws Exception {
@@ -110,13 +112,13 @@ public abstract class KStreamsNativeEncodingDecodingTests {
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "counts");
assertThat(cr.value().equals("Count for foobar : 1")).isTrue();
verify(KStreamBoundMessageConversionDelegate, never()).serializeOnOutbound(any(KStream.class));
verify(KStreamBoundMessageConversionDelegate, never()).deserializeOnInbound(any(Class.class), any(KStream.class));
verify(KafkaStreamsMessageConversionDelegate, never()).serializeOnOutbound(any(KStream.class));
verify(KafkaStreamsMessageConversionDelegate, never()).deserializeOnInbound(any(Class.class), any(KStream.class));
}
}
@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.NONE)
public static class NativeEncodingDecodingDisabledTests extends KStreamsNativeEncodingDecodingTests {
public static class NativeEncodingDecodingDisabledTests extends KafkaStreamsNativeEncodingDecodingTests {
@Test
public void test() throws Exception {
@@ -128,16 +130,16 @@ public abstract class KStreamsNativeEncodingDecodingTests {
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "counts");
assertThat(cr.value().equals("Count for foobar : 1")).isTrue();
verify(KStreamBoundMessageConversionDelegate).serializeOnOutbound(any(KStream.class));
verify(KStreamBoundMessageConversionDelegate).deserializeOnInbound(any(Class.class), any(KStream.class));
verify(KafkaStreamsMessageConversionDelegate).serializeOnOutbound(any(KStream.class));
verify(KafkaStreamsMessageConversionDelegate).deserializeOnInbound(any(Class.class), any(KStream.class));
}
}
@EnableBinding(KStreamProcessor.class)
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
@PropertySource("classpath:/org/springframework/cloud/stream/binder/kstream/integTest-1.properties")
@EnableConfigurationProperties(KStreamApplicationSupportProperties.class)
static class WordCountProcessorApplication {
@EnableConfigurationProperties(KafkaStreamsApplicationSupportProperties.class)
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@@ -149,8 +151,9 @@ public abstract class KStreamsNativeEncodingDecodingTests {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serdes.String(), Serdes.String())
.count(timeWindows, "foo-WordCounts-x")
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(timeWindows)
.count(Materialized.as("foo-WordCounts-x"))
.toStream()
.map((key, value) -> new KeyValue<>(null, "Count for " + key.key() + " : " + value));
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Map;
@@ -23,6 +23,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -30,10 +32,11 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -50,7 +53,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Soby Chacko
* @author Gary Russell
*/
public class KstreamBinderPojoInputStringOutputIntegrationTests {
public class KafkastreamsBinderPojoInputStringOutputIntegrationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts-id");
@@ -74,19 +77,19 @@ public class KstreamBinderPojoInputStringOutputIntegrationTests {
@Test
public void testKstreamBinderWithPojoInputAndStringOuput() throws Exception {
SpringApplication app = new SpringApplication(ProductCountApplication.class);
app.setWebEnvironment(false);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=foos",
"--spring.cloud.stream.bindings.output.destination=counts-id",
"--spring.cloud.stream.kstream.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kstream.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kstream.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
"--spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kstream.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kstream.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {
receiveAndValidateFoo(context);
} finally {
@@ -104,7 +107,7 @@ public class KstreamBinderPojoInputStringOutputIntegrationTests {
assertThat(cr.value().contains("Count for product with ID 123: 1")).isTrue();
}
@EnableBinding(KStreamProcessor.class)
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
public static class ProductCountApplication {
@@ -115,8 +118,9 @@ public class KstreamBinderPojoInputStringOutputIntegrationTests {
return input
.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
.count(TimeWindows.of(5000), "id-count-store")
.groupByKey(Serialized.with(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class)))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("id-count-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key().id, "Count for product with ID 123: " + value));
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.ArrayList;
import java.util.Arrays;
@@ -42,13 +42,14 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.cloud.stream.binder.kstream.config.KStreamApplicationSupportProperties;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -85,9 +86,9 @@ public class StreamToTableJoinIntegrationTests {
consumer.close();
}
@EnableBinding(KStreamProcessorX.class)
@EnableBinding(KafkaStreamsProcessorX.class)
@EnableAutoConfiguration
@EnableConfigurationProperties(KStreamApplicationSupportProperties.class)
@EnableConfigurationProperties(KafkaStreamsApplicationSupportProperties.class)
public static class CountClicksPerRegionApplication {
@StreamListener
@@ -105,7 +106,7 @@ public class StreamToTableJoinIntegrationTests {
}
}
interface KStreamProcessorX extends KStreamProcessor {
interface KafkaStreamsProcessorX extends KafkaStreamsProcessor {
@Input("inputX")
KTable<?, ?> inputX();
@@ -114,7 +115,7 @@ public class StreamToTableJoinIntegrationTests {
@Test
public void testStreamToTable() throws Exception {
SpringApplication app = new SpringApplication(CountClicksPerRegionApplication.class);
app.setWebEnvironment(false);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
@@ -124,20 +125,20 @@ public class StreamToTableJoinIntegrationTests {
"--spring.cloud.stream.bindings.input.consumer.useNativeDecoding=true",
"--spring.cloud.stream.bindings.inputX.consumer.useNativeDecoding=true",
"--spring.cloud.stream.bindings.output.producer.useNativeEncoding=true",
"--spring.cloud.stream.kstream.bindings.input.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kstream.bindings.input.consumer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde",
"--spring.cloud.stream.kstream.bindings.inputX.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kstream.bindings.inputX.consumer.valueSerde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde",
"--spring.cloud.stream.kstream.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kstream.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kstream.binder.configuration.commit.interval.ms=10000",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde",
"--spring.cloud.stream.kafka.streams.bindings.inputX.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.bindings.inputX.consumer.valueSerde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=10000",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.bindings.inputX.consumer.headerMode=raw",
"--spring.cloud.stream.kstream.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kstream.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {
// Input 1: Clicks per user (multiple records allowed per user).
List<KeyValue<String, Long>> userClicks = Arrays.asList(

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Arrays;
import java.util.Date;
@@ -35,13 +35,14 @@ import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.config.KStreamApplicationSupportProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -80,7 +81,7 @@ public class WordCountMultipleBranchesIntegrationTests {
@EnableBinding(KStreamProcessorX.class)
@EnableAutoConfiguration
@EnableConfigurationProperties(KStreamApplicationSupportProperties.class)
@EnableConfigurationProperties(KafkaStreamsApplicationSupportProperties.class)
public static class WordCountProcessorApplication {
@Autowired
@@ -124,7 +125,7 @@ public class WordCountMultipleBranchesIntegrationTests {
@Test
public void testKstreamWordCountWithStringInputAndPojoOuput() throws Exception {
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
app.setWebEnvironment(false);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
@@ -135,15 +136,15 @@ public class WordCountMultipleBranchesIntegrationTests {
"--spring.cloud.stream.bindings.output2.contentType=application/json",
"--spring.cloud.stream.bindings.output3.destination=bar",
"--spring.cloud.stream.bindings.output3.contentType=application/json",
"--spring.cloud.stream.kstream.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kstream.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kstream.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kstream.timeWindow.length=5000",
"--spring.cloud.stream.kstream.timeWindow.advanceBy=0",
"--spring.cloud.stream.kstream.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kstream.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {
receiveAndValidate(context);
} finally {

View File

@@ -0,0 +1,10 @@
spring.cloud.stream.bindings.input.destination=words
spring.cloud.stream.bindings.output.destination=counts
spring.cloud.stream.bindings.output.contentType=application/json
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.kafka.streams.timeWindow.length=5000
spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0

View File

@@ -1,5 +0,0 @@
eclipse.preferences.version=1
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=java;javax;com;org;org.springframework;ch.qos;\#;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99

View File

@@ -1,34 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream.annotations;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
/**
* @author Marius Bogoevici
*/
public interface KStreamProcessor {
@Input("input")
KStream<?, ?> input();
@Output("output")
KStream<?, ?> output();
}

View File

@@ -1,163 +0,0 @@
/*
* Copyright 2017-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kstream.config;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.kstream.KStreamBindingInformationCatalogue;
import org.springframework.cloud.stream.binder.kstream.KStreamBoundElementFactory;
import org.springframework.cloud.stream.binder.kstream.KStreamBoundMessageConversionDelegate;
import org.springframework.cloud.stream.binder.kstream.KStreamListenerParameterAdapter;
import org.springframework.cloud.stream.binder.kstream.KStreamListenerSetupMethodOrchestrator;
import org.springframework.cloud.stream.binder.kstream.KStreamStreamListenerResultAdapter;
import org.springframework.cloud.stream.binder.kstream.KTableBoundElementFactory;
import org.springframework.cloud.stream.binder.kstream.KeyValueSerdeResolver;
import org.springframework.cloud.stream.binder.kstream.QueryableStoreRegistry;
import org.springframework.cloud.stream.binder.kstream.SendToDlqAndContinue;
import org.springframework.cloud.stream.binder.kstream.StreamsBuildersLifecycle;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.util.ObjectUtils;
/**
* @author Marius Bogoevici
* @author Soby Chacko
*/
@EnableConfigurationProperties(KStreamExtendedBindingProperties.class)
public class KStreamBinderSupportAutoConfiguration {
@Bean
@ConfigurationProperties(prefix = "spring.cloud.stream.kstream.binder")
public KStreamBinderConfigurationProperties binderConfigurationProperties() {
return new KStreamBinderConfigurationProperties();
}
@Bean("streamConfigGlobalProperties")
public Map<String,Object> streamConfigGlobalProperties(KStreamBinderConfigurationProperties binderConfigurationProperties){
Map<String,Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, binderConfigurationProperties.getKafkaConnectionString());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, binderConfigurationProperties.getApplicationId());
if(binderConfigurationProperties.getSerdeError() == KStreamBinderConfigurationProperties.SerdeError.logAndContinue) {
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
}
else if(binderConfigurationProperties.getSerdeError() == KStreamBinderConfigurationProperties.SerdeError.logAndFail) {
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
}
else if (binderConfigurationProperties.getSerdeError() == KStreamBinderConfigurationProperties.SerdeError.sendToDlq) {
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
SendToDlqAndContinue.class);
}
if (!ObjectUtils.isEmpty(binderConfigurationProperties.getConfiguration())) {
props.putAll(binderConfigurationProperties.getConfiguration());
}
return props;
}
@Bean
public KStreamStreamListenerResultAdapter kafkaStreamStreamListenerResultAdapter() {
return new KStreamStreamListenerResultAdapter();
}
@Bean
public KStreamListenerParameterAdapter kafkaStreamListenerParameterAdapter(
KStreamBoundMessageConversionDelegate kstreamBoundMessageConversionDelegate, KStreamBindingInformationCatalogue KStreamBindingInformationCatalogue) {
return new KStreamListenerParameterAdapter(kstreamBoundMessageConversionDelegate, KStreamBindingInformationCatalogue);
}
@Bean
public KStreamListenerSetupMethodOrchestrator kStreamListenerSetupMethodOrchestrator(
BindingServiceProperties bindingServiceProperties,
KStreamExtendedBindingProperties kStreamExtendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver,
KStreamBindingInformationCatalogue kStreamBindingInformationCatalogue,
KStreamListenerParameterAdapter kafkaStreamListenerParameterAdapter,
Collection<StreamListenerResultAdapter> streamListenerResultAdapters,
KStreamBinderConfigurationProperties binderConfigurationProperties) {
return new KStreamListenerSetupMethodOrchestrator(bindingServiceProperties,
kStreamExtendedBindingProperties, keyValueSerdeResolver, kStreamBindingInformationCatalogue,
kafkaStreamListenerParameterAdapter, streamListenerResultAdapters, binderConfigurationProperties);
}
@Bean
public KStreamBoundMessageConversionDelegate messageConversionDelegate(CompositeMessageConverterFactory compositeMessageConverterFactory,
SendToDlqAndContinue sendToDlqAndContinue,
KStreamBindingInformationCatalogue KStreamBindingInformationCatalogue,
KStreamBinderConfigurationProperties binderConfigurationProperties) {
return new KStreamBoundMessageConversionDelegate(compositeMessageConverterFactory, sendToDlqAndContinue,
KStreamBindingInformationCatalogue, binderConfigurationProperties);
}
@Bean
public KStreamBoundElementFactory kafkaStreamBindableTargetFactory(BindingServiceProperties bindingServiceProperties,
KStreamBindingInformationCatalogue KStreamBindingInformationCatalogue) {
return new KStreamBoundElementFactory(bindingServiceProperties,
KStreamBindingInformationCatalogue);
}
@Bean
public KTableBoundElementFactory kTableBoundElementFactory() {
return new KTableBoundElementFactory();
}
@Bean
public SendToDlqAndContinue kStreamDlqSender() {
return new SendToDlqAndContinue();
}
@Bean
public KStreamBindingInformationCatalogue boundedKStreamRegistryService() {
return new KStreamBindingInformationCatalogue();
}
@Bean
@SuppressWarnings("unchecked")
public KeyValueSerdeResolver keyValueSerdeResolver(@Qualifier("streamConfigGlobalProperties") Object streamConfigGlobalProperties,
KStreamBinderConfigurationProperties kStreamBinderConfigurationProperties) {
return new KeyValueSerdeResolver((Map<String,Object>)streamConfigGlobalProperties, kStreamBinderConfigurationProperties);
}
@Bean
public QueryableStoreRegistry queryableStoreTypeRegistry() {
return new QueryableStoreRegistry();
}
@Bean
public StreamsBuildersLifecycle streamsBuildersLifecycle(KStreamBindingInformationCatalogue kStreamBindingInformationCatalogue,
QueryableStoreRegistry queryableStoreRegistry){
return new StreamsBuildersLifecycle(kStreamBindingInformationCatalogue, queryableStoreRegistry);
}
}

View File

@@ -1,6 +0,0 @@
kstream:\
org.springframework.cloud.stream.binder.kstream.config.KStreamBinderConfiguration
ktable:\
org.springframework.cloud.stream.binder.kstream.config.KTableBinderConfiguration

View File

@@ -1,5 +0,0 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.stream.binder.kstream.config.KStreamBinderSupportAutoConfiguration,\
org.springframework.cloud.stream.binder.kstream.config.KStreamApplicationSupportAutoConfiguration

View File

@@ -1,10 +0,0 @@
spring.cloud.stream.bindings.input.destination=words
spring.cloud.stream.bindings.output.destination=counts
spring.cloud.stream.bindings.output.contentType=application/json
spring.cloud.stream.kstream.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kstream.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.kstream.timeWindow.length=5000
spring.cloud.stream.kstream.timeWindow.advanceBy=0