Compare commits

...

14 Commits

Author SHA1 Message Date
Soby Chacko
74a044b0c0 2.1.0.M3 Release 2018-09-21 11:00:17 -04:00
Soby Chacko
70f385553a Application ID resolution for Kafka streams binder (#450)
* Application ID resolution for kafka streams binder

Avoid the need to rely on group property for application id.
First, check binding specific application id, if not look at defaults.
If nothing works, fall back to the default application id set by the
boot auto configuration.

Modify tests.
Update docs.

Resolves #448

* Addressing PR review comments

* Minor polishing

* Addressing PR review comments
2018-09-21 10:17:34 -04:00
Soby Chacko
f5739a9c7f Missing beans registration in Kafka Streams binder
There are duplicate code in various binder configurations where we register missing beans.
Consolidate them into a common class that implements ImportBeanDefinitionRegistrar and then
import this class in the binder configurations.

Resolves #445
2018-09-20 11:28:32 -04:00
Soby Chacko
39f5490488 Refactor bootstrap server configuration
There is a common piece of code repeated in both producer and consumer
configuration where it is populating the bootstrap server configuration.
Refactoring into a common method.

Resolves #208
2018-09-19 16:15:19 -04:00
Soby Chacko
d445a2bff9 Docs for GlobalKTable binding
Resolves #443
2018-09-19 15:05:48 -04:00
Oleg Zhurakousky
5446485cbf Updating with changes related to core GH-1484
Resolves #447
2018-09-19 10:45:04 +02:00
Soby Chacko
cc61d916b8 Extended default properties
Allow applications to configure default values for extended prducer and
consumer properties across multiple bindings in order to avoid repetition.

Address the changes in both Kafka and Kafka Streams binders.

Add integration test to verify both binding specific and default extended properties.

Resolves #444
Requires https://github.com/spring-cloud/spring-cloud-stream/pull/1477
2018-09-17 20:09:44 -04:00
Soby Chacko
41d9136812 GH-384: Binding support for GlobalKTable
Resolves spring-cloud/spring-cloud-stream-binder-kafka#384

* New binding targets and binder implementation for GlobalKTable within kafka-streams binder
* Refactoring existing structure to accommodate the new binder
* Adding integration test to verify the GlobalKTable behavior

Resolves #384

* Addressing PR review comments

* Update spring-kafka to 2.2.0.M3
Addressing PR review comments
Polishing

* Addressing PR review comments

* Addressing PR review comments
2018-09-14 10:51:15 -04:00
Soby Chacko
71a6a6cf28 Package structure changes for StreamBuilderFactoryBean 2018-09-12 12:05:52 -04:00
Gary Russell
36361d19bf GH-1459: Pollable Consumer and Requeue
Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/1459

Requires https://github.com/spring-cloud/spring-cloud-stream/pull/1467
2018-09-10 10:22:04 -04:00
Soby Chacko
e869516e3d Handling tombstones in kafka streams binder
Handle tombstones gracefully in the kafka streams binder
Modify tests to verify

Resolves spring-cloud/spring-cloud-stream-binder-kafka#294

* Addressing PR review comments

* Addressing PR review comments
2018-09-05 12:17:54 -04:00
Gary Russell
ea288c62eb GH-435: useNativeEncoding and Transactions
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/435

All common properties should be settable on the transactional producer.
2018-09-04 15:56:26 -04:00
Soby Chacko
afa337f718 Remove deprecations in tests
Resolves #433
2018-09-04 15:28:36 -04:00
Soby Chacko
e4b08e888e Next update version: 2.1.0.BUILD-SNAPSHOT 2018-08-28 10:15:08 -04:00
48 changed files with 1586 additions and 292 deletions

View File

@@ -2,20 +2,20 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.0.M2</version>
<version>2.1.0.M3</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>2.1.0.M1</version>
<version>2.1.0.M2</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>2.2.0.M2</spring-kafka.version>
<spring-kafka.version>2.2.0.M3</spring-kafka.version>
<spring-integration-kafka.version>3.1.0.M1</spring-integration-kafka.version>
<kafka.version>2.0.0</kafka.version>
<spring-cloud-stream.version>2.1.0.M2</spring-cloud-stream.version>
<spring-cloud-stream.version>2.1.0.M3</spring-cloud-stream.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.0.M2</version>
<version>2.1.0.M3</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.0.M2</version>
<version>2.1.0.M3</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -21,12 +21,22 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.validation.constraints.AssertTrue;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.BeansException;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties.CompressionType;
import org.springframework.cloud.stream.config.MergableProperties;
import org.springframework.expression.Expression;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
@@ -125,6 +135,10 @@ public class KafkaBinderConfigurationProperties {
this.kafkaProperties = kafkaProperties;
}
public KafkaProperties getKafkaProperties() {
return kafkaProperties;
}
public Transaction getTransaction() {
return this.transaction;
}
@@ -515,21 +529,7 @@ public class KafkaBinderConfigurationProperties {
consumerConfiguration.putAll(this.consumerProperties);
// Override Spring Boot bootstrap server setting if left to default with the value
// configured in the binder
if (ObjectUtils.isEmpty(consumerConfiguration.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
consumerConfiguration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaConnectionString());
}
else {
Object boostrapServersConfig = consumerConfiguration.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
if (boostrapServersConfig instanceof List) {
@SuppressWarnings("unchecked")
List<String> bootStrapServers = (List<String>) consumerConfiguration
.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
if (bootStrapServers.size() == 1 && bootStrapServers.get(0).equals("localhost:9092")) {
consumerConfiguration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaConnectionString());
}
}
}
return Collections.unmodifiableMap(consumerConfiguration);
return getConfigurationWithBootstrapServer(consumerConfiguration, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
}
/**
@@ -550,21 +550,25 @@ public class KafkaBinderConfigurationProperties {
producerConfiguration.putAll(this.producerProperties);
// Override Spring Boot bootstrap server setting if left to default with the value
// configured in the binder
if (ObjectUtils.isEmpty(producerConfiguration.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
producerConfiguration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaConnectionString());
return getConfigurationWithBootstrapServer(producerConfiguration, ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
}
private Map<String, Object> getConfigurationWithBootstrapServer(Map<String, Object> configuration, String bootstrapServersConfig) {
if (ObjectUtils.isEmpty(configuration.get(bootstrapServersConfig))) {
configuration.put(bootstrapServersConfig, getKafkaConnectionString());
}
else {
Object boostrapServersConfig = producerConfiguration.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
Object boostrapServersConfig = configuration.get(bootstrapServersConfig);
if (boostrapServersConfig instanceof List) {
@SuppressWarnings("unchecked")
List<String> bootStrapServers = (List<String>) producerConfiguration
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
List<String> bootStrapServers = (List<String>) configuration
.get(bootstrapServersConfig);
if (bootStrapServers.size() == 1 && bootStrapServers.get(0).equals("localhost:9092")) {
producerConfiguration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaConnectionString());
configuration.put(bootstrapServersConfig, getKafkaConnectionString());
}
}
}
return Collections.unmodifiableMap(producerConfiguration);
return Collections.unmodifiableMap(configuration);
}
public JaasLoginModuleConfiguration getJaas() {
@@ -585,7 +589,7 @@ public class KafkaBinderConfigurationProperties {
public static class Transaction {
private final KafkaProducerProperties producer = new KafkaProducerProperties();
private final CombinedProducerProperties producer = new CombinedProducerProperties();
private String transactionIdPrefix;
@@ -597,10 +601,183 @@ public class KafkaBinderConfigurationProperties {
this.transactionIdPrefix = transactionIdPrefix;
}
public KafkaProducerProperties getProducer() {
public CombinedProducerProperties getProducer() {
return this.producer;
}
}
/**
* An combination of {@link ProducerProperties} and {@link KafkaProducerProperties}
* so that common and kafka-specific properties can be set for the transactional
* producer.
* @since 2.1
*/
public static class CombinedProducerProperties {
private final ProducerProperties producerProperties = new ProducerProperties();
private final KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties();
public void merge(MergableProperties mergable) {
this.producerProperties.merge(mergable);
}
public Expression getPartitionKeyExpression() {
return this.producerProperties.getPartitionKeyExpression();
}
public void setPartitionKeyExpression(Expression partitionKeyExpression) {
this.producerProperties.setPartitionKeyExpression(partitionKeyExpression);
}
public boolean isPartitioned() {
return this.producerProperties.isPartitioned();
}
public void copyProperties(Object source, Object target) throws BeansException {
this.producerProperties.copyProperties(source, target);
}
public Expression getPartitionSelectorExpression() {
return this.producerProperties.getPartitionSelectorExpression();
}
public void setPartitionSelectorExpression(Expression partitionSelectorExpression) {
this.producerProperties.setPartitionSelectorExpression(partitionSelectorExpression);
}
public @Min(value = 1, message = "Partition count should be greater than zero.") int getPartitionCount() {
return this.producerProperties.getPartitionCount();
}
public void setPartitionCount(int partitionCount) {
this.producerProperties.setPartitionCount(partitionCount);
}
public String[] getRequiredGroups() {
return this.producerProperties.getRequiredGroups();
}
public void setRequiredGroups(String... requiredGroups) {
this.producerProperties.setRequiredGroups(requiredGroups);
}
public @AssertTrue(message = "Partition key expression and partition key extractor class properties are mutually exclusive.") boolean isValidPartitionKeyProperty() {
return this.producerProperties.isValidPartitionKeyProperty();
}
public @AssertTrue(message = "Partition selector class and partition selector expression properties are mutually exclusive.") boolean isValidPartitionSelectorProperty() {
return this.producerProperties.isValidPartitionSelectorProperty();
}
public HeaderMode getHeaderMode() {
return this.producerProperties.getHeaderMode();
}
public void setHeaderMode(HeaderMode headerMode) {
this.producerProperties.setHeaderMode(headerMode);
}
public boolean isUseNativeEncoding() {
return this.producerProperties.isUseNativeEncoding();
}
public void setUseNativeEncoding(boolean useNativeEncoding) {
this.producerProperties.setUseNativeEncoding(useNativeEncoding);
}
public boolean isErrorChannelEnabled() {
return this.producerProperties.isErrorChannelEnabled();
}
public void setErrorChannelEnabled(boolean errorChannelEnabled) {
this.producerProperties.setErrorChannelEnabled(errorChannelEnabled);
}
public String getPartitionKeyExtractorName() {
return this.producerProperties.getPartitionKeyExtractorName();
}
public void setPartitionKeyExtractorName(String partitionKeyExtractorName) {
this.producerProperties.setPartitionKeyExtractorName(partitionKeyExtractorName);
}
public String getPartitionSelectorName() {
return this.producerProperties.getPartitionSelectorName();
}
public void setPartitionSelectorName(String partitionSelectorName) {
this.producerProperties.setPartitionSelectorName(partitionSelectorName);
}
public int getBufferSize() {
return this.kafkaProducerProperties.getBufferSize();
}
public void setBufferSize(int bufferSize) {
this.kafkaProducerProperties.setBufferSize(bufferSize);
}
public @NotNull CompressionType getCompressionType() {
return this.kafkaProducerProperties.getCompressionType();
}
public void setCompressionType(CompressionType compressionType) {
this.kafkaProducerProperties.setCompressionType(compressionType);
}
public boolean isSync() {
return this.kafkaProducerProperties.isSync();
}
public void setSync(boolean sync) {
this.kafkaProducerProperties.setSync(sync);
}
public int getBatchTimeout() {
return this.kafkaProducerProperties.getBatchTimeout();
}
public void setBatchTimeout(int batchTimeout) {
this.kafkaProducerProperties.setBatchTimeout(batchTimeout);
}
public Expression getMessageKeyExpression() {
return this.kafkaProducerProperties.getMessageKeyExpression();
}
public void setMessageKeyExpression(Expression messageKeyExpression) {
this.kafkaProducerProperties.setMessageKeyExpression(messageKeyExpression);
}
public String[] getHeaderPatterns() {
return this.kafkaProducerProperties.getHeaderPatterns();
}
public void setHeaderPatterns(String[] headerPatterns) {
this.kafkaProducerProperties.setHeaderPatterns(headerPatterns);
}
public Map<String, String> getConfiguration() {
return this.kafkaProducerProperties.getConfiguration();
}
public void setConfiguration(Map<String, String> configuration) {
this.kafkaProducerProperties.setConfiguration(configuration);
}
public KafkaAdminProperties getAdmin() {
return this.kafkaProducerProperties.getAdmin();
}
public void setAdmin(KafkaAdminProperties admin) {
this.kafkaProducerProperties.setAdmin(admin);
}
public KafkaProducerProperties getExtension() {
return this.kafkaProducerProperties;
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,10 +16,13 @@
package org.springframework.cloud.stream.binder.kafka.properties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
/**
* @author Marius Bogoevici
* @author Oleg Zhurakousky
*/
public class KafkaBindingProperties {
public class KafkaBindingProperties implements BinderSpecificPropertiesProvider{
private KafkaConsumerProperties consumer = new KafkaConsumerProperties();

View File

@@ -19,6 +19,8 @@ package org.springframework.cloud.stream.binder.kafka.properties;
import java.util.HashMap;
import java.util.Map;
import org.springframework.cloud.stream.config.MergableProperties;
/**
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
@@ -29,7 +31,7 @@ import java.util.Map;
* Thanks to Laszlo Szabo for providing the initial patch for generic property support.
* </p>
*/
public class KafkaConsumerProperties {
public class KafkaConsumerProperties implements MergableProperties {
public enum StartOffset {
earliest(-2L),

View File

@@ -20,16 +20,20 @@ import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
/**
* @author Marius Bogoevici
* @author Gary Russell
* @author Soby Chacko
*/
@ConfigurationProperties("spring.cloud.stream.kafka")
public class KafkaExtendedBindingProperties
implements ExtendedBindingProperties<KafkaConsumerProperties, KafkaProducerProperties> {
private static final String DEFAULTS_PREFIX = "spring.cloud.stream.kafka.default";
private Map<String, KafkaBindingProperties> bindings = new HashMap<>();
public Map<String, KafkaBindingProperties> getBindings() {
@@ -82,4 +86,14 @@ public class KafkaExtendedBindingProperties
}
}
@Override
public String getDefaultsPrefix() {
return DEFAULTS_PREFIX;
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return KafkaBindingProperties.class;
}
}

View File

@@ -21,6 +21,7 @@ import java.util.Map;
import javax.validation.constraints.NotNull;
import org.springframework.cloud.stream.config.MergableProperties;
import org.springframework.expression.Expression;
/**
@@ -28,7 +29,7 @@ import org.springframework.expression.Expression;
* @author Henryk Konsek
* @author Gary Russell
*/
public class KafkaProducerProperties {
public class KafkaProducerProperties implements MergableProperties {
private int bufferSize = 16384;

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.0.M2</version>
<version>2.1.0.M3</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>

View File

@@ -20,13 +20,15 @@ https://kafka.apache.org/documentation/streams/developer-guide[Apache Kafka Stre
Kafka Streams binder implementation builds on the foundation provided by the http://docs.spring.io/spring-kafka/reference/html/_reference.html#kafka-streams[Kafka Streams in Spring Kafka]
project.
Kafka Streams binder provides binding capabilities for the three major types in Kafka Streams - KStream, KTable and GlobalKTable.
As part of this native integration, the high-level https://docs.confluent.io/current/streams/developer-guide/dsl-api.html[Streams DSL]
provided by the Kafka Streams API is available for use in the business logic, too.
provided by the Kafka Streams API is available for use in the business logic.
An early version of the https://docs.confluent.io/current/streams/developer-guide/processor-api.html[Processor API]
support is available as well.
As noted early-on, Kafka Streams support in Spring Cloud Stream strictly only available for use in the Processor model.
As noted early-on, Kafka Streams support in Spring Cloud Stream is strictly only available for use in the Processor model.
A model in which the messages read from an inbound topic, business processing can be applied, and the transformed messages
can be written to an outbound topic. It can also be used in Processor applications with a no-outbound destination.
@@ -113,14 +115,13 @@ serdeError::
+
Default: `logAndFail`
applicationId::
Application ID for all the stream configurations in the current application context.
You can override the application id for an individual `StreamListener` method using the `group` property on the binding.
You have to ensure that you are using the same group name for all input bindings in the case of multiple inputs on the same methods.
Convenient way to set the application.id for the Kafka Streams application globally at the binder level.
If the application contains multiple `StreamListener` methods, then application.id should be set at the binding level per input binding.
+
Default: `default`
Default: `none`
The following properties are _only_ available for Kafka Streams producers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.`
literal.
The following properties are _only_ available for Kafka Streams producers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.` literal.
For convenience, if there multiple output bindings and they all require a common value, that can be configured by using the prefix `spring.cloud.stream.kafka.streams.default.producer.`.
keySerde::
key serde to use
@@ -135,9 +136,13 @@ useNativeEncoding::
+
Default: `false`.
The following properties are _only_ available for Kafka Streams consumers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.consumer.`
literal.
The following properties are _only_ available for Kafka Streams consumers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.consumer.`literal.
For convenience, if there multiple input bindings and they all require a common value, that can be configured by using the prefix `spring.cloud.stream.kafka.streams.default.consumer.`.
applicationId::
Setting application.id per input binding.
+
Default: `none`
keySerde::
key serde to use
+
@@ -218,6 +223,12 @@ through the following property.
spring.cloud.stream.kafka.streams.bindings.inputTable.consumer.materializedAs: all-songs
----
The above example shows the use of KTable as an input binding.
The binder also supports input bindings for GlobalKTable.
GlobalKTable binding is useful when you have to ensure that all instances of your application has access to the data updates from the topic.
KTable and GlobalKTable bindings are only available on the input.
Binder supports both input and output bindings for KStream.
=== Multiple Input Bindings as a Processor
[source]

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.0.M2</version>
<version>2.1.0.M3</version>
</parent>
<dependencies>

View File

@@ -0,0 +1,104 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.util.StringUtils;
/**
* An {@link AbstractBinder} implementation for {@link GlobalKTable}.
*
* Provides only consumer binding for the bound {@link GlobalKTable}.
* Output bindings are not allowed on this binder.
*
* @author Soby Chacko
* @since 2.1.0
*/
public class GlobalKTableBinder extends
AbstractBinder<GlobalKTable<Object, Object>, ExtendedConsumerProperties<KafkaStreamsConsumerProperties>, ExtendedProducerProperties<KafkaStreamsProducerProperties>>
implements ExtendedPropertiesBinder<GlobalKTable<Object, Object>, KafkaStreamsConsumerProperties, KafkaStreamsProducerProperties> {
private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
private final KafkaTopicProvisioner kafkaTopicProvisioner;
private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties = new KafkaStreamsExtendedBindingProperties();
public GlobalKTableBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) {
this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
}
@Override
@SuppressWarnings("unchecked")
protected Binding<GlobalKTable<Object, Object>> doBindConsumer(String name, String group, GlobalKTable<Object, Object> inputTarget,
ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties) {
if (!StringUtils.hasText(group)) {
group = binderConfigurationProperties.getApplicationId();
}
KafkaStreamsBinderUtils.prepareConsumerBinding(name, group, inputTarget,
getApplicationContext(),
kafkaTopicProvisioner,
kafkaStreamsBindingInformationCatalogue,
binderConfigurationProperties, properties);
return new DefaultBinding<>(name, group, inputTarget, null);
}
@Override
protected Binding<GlobalKTable<Object, Object>> doBindProducer(String name, GlobalKTable<Object, Object> outboundBindTarget,
ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {
throw new UnsupportedOperationException("No producer level binding is allowed for GlobalKTable");
}
@Override
public KafkaStreamsConsumerProperties getExtendedConsumerProperties(String channelName) {
return this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(channelName);
}
@Override
public KafkaStreamsProducerProperties getExtendedProducerProperties(String channelName) {
throw new UnsupportedOperationException("No producer binding is allowed and therefore no properties");
}
@Override
public String getDefaultsPrefix() {
return this.kafkaStreamsExtendedBindingProperties.getDefaultsPrefix();
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return this.kafkaStreamsExtendedBindingProperties.getExtendedPropertiesEntryClass();
}
}

View File

@@ -0,0 +1,48 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
/**
* @author Soby Chacko
* @since 2.1.0
*/
@Configuration
@Import(KafkaStreamsBinderUtils.KafkaStreamsMissingBeansRegistrar.class)
public class GlobalKTableBinderConfiguration {
@Bean
public KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties binderConfigurationProperties,
KafkaProperties kafkaProperties) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties);
}
@Bean
public GlobalKTableBinder GlobalKTableBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue) {
return new GlobalKTableBinder(binderConfigurationProperties, kafkaTopicProvisioner,
KafkaStreamsBindingInformationCatalogue);
}
}

View File

@@ -0,0 +1,93 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binding.AbstractBindingTargetFactory;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.util.Assert;
/**
* {@link org.springframework.cloud.stream.binding.BindingTargetFactory} for {@link GlobalKTable}
*
* Input bindings are only created as output bindings on GlobalKTable are not allowed.
*
* @author Soby Chacko
* @since 2.1.0
*/
public class GlobalKTableBoundElementFactory extends AbstractBindingTargetFactory<GlobalKTable> {
private final BindingServiceProperties bindingServiceProperties;
GlobalKTableBoundElementFactory(BindingServiceProperties bindingServiceProperties) {
super(GlobalKTable.class);
this.bindingServiceProperties = bindingServiceProperties;
}
@Override
public GlobalKTable createInput(String name) {
ConsumerProperties consumerProperties = this.bindingServiceProperties.getConsumerProperties(name);
//Always set multiplex to true in the kafka streams binder
consumerProperties.setMultiplex(true);
GlobalKTableBoundElementFactory.GlobalKTableWrapperHandler wrapper= new GlobalKTableBoundElementFactory.GlobalKTableWrapperHandler();
ProxyFactory proxyFactory = new ProxyFactory(GlobalKTableBoundElementFactory.GlobalKTableWrapper.class, GlobalKTable.class);
proxyFactory.addAdvice(wrapper);
return (GlobalKTable) proxyFactory.getProxy();
}
@Override
public GlobalKTable createOutput(String name) {
throw new UnsupportedOperationException("Outbound operations are not allowed on target type GlobalKTable");
}
public interface GlobalKTableWrapper {
void wrap(GlobalKTable<Object, Object> delegate);
}
private static class GlobalKTableWrapperHandler implements GlobalKTableBoundElementFactory.GlobalKTableWrapper, MethodInterceptor {
private GlobalKTable<Object, Object> delegate;
public void wrap(GlobalKTable<Object, Object> delegate) {
Assert.notNull(delegate, "delegate cannot be null");
Assert.isNull(this.delegate, "delegate already set to " + this.delegate);
this.delegate = delegate;
}
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
if (methodInvocation.getMethod().getDeclaringClass().equals(GlobalKTable.class)) {
Assert.notNull(delegate, "Trying to prepareConsumerBinding " + methodInvocation
.getMethod() + " but no delegate has been set.");
return methodInvocation.getMethod().invoke(delegate, methodInvocation.getArguments());
}
else if (methodInvocation.getMethod().getDeclaringClass().equals(GlobalKTableBoundElementFactory.GlobalKTableWrapper.class)) {
return methodInvocation.getMethod().invoke(this, methodInvocation.getArguments());
}
else {
throw new IllegalStateException("Only GlobalKTable method invocations are permitted");
}
}
}
}

View File

@@ -19,18 +19,16 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
@@ -62,7 +60,7 @@ class KStreamBinder extends
private final KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate;
private final KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue;
private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
private final KeyValueSerdeResolver keyValueSerdeResolver;
@@ -74,7 +72,7 @@ class KStreamBinder extends
this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
this.KafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
this.keyValueSerdeResolver = keyValueSerdeResolver;
}
@@ -83,42 +81,15 @@ class KStreamBinder extends
protected Binding<KStream<Object, Object>> doBindConsumer(String name, String group,
KStream<Object, Object> inputTarget,
ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties) {
this.KafkaStreamsBindingInformationCatalogue.registerConsumerProperties(inputTarget, properties.getExtension());
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(
properties.getExtension());
if (binderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.sendToDlq) {
extendedConsumerProperties.getExtension().setEnableDlq(true);
}
this.kafkaStreamsBindingInformationCatalogue.registerConsumerProperties(inputTarget, properties.getExtension());
if (!StringUtils.hasText(group)) {
group = binderConfigurationProperties.getApplicationId();
}
String[] inputTopics = StringUtils.commaDelimitedListToStringArray(name);
for (String inputTopic : inputTopics) {
this.kafkaTopicProvisioner.provisionConsumerDestination(inputTopic, group, extendedConsumerProperties);
}
if (extendedConsumerProperties.getExtension().isEnableDlq()) {
StreamsConfig streamsConfig = this.KafkaStreamsBindingInformationCatalogue.getStreamsConfig(inputTarget);
KafkaStreamsDlqDispatch kafkaStreamsDlqDispatch = !StringUtils.isEmpty(extendedConsumerProperties.getExtension().getDlqName()) ?
new KafkaStreamsDlqDispatch(extendedConsumerProperties.getExtension().getDlqName(), binderConfigurationProperties,
extendedConsumerProperties.getExtension()) : null;
for (String inputTopic : inputTopics) {
if (StringUtils.isEmpty(extendedConsumerProperties.getExtension().getDlqName())) {
String dlqName = "error." + inputTopic + "." + group;
kafkaStreamsDlqDispatch = new KafkaStreamsDlqDispatch(dlqName, binderConfigurationProperties,
extendedConsumerProperties.getExtension());
}
SendToDlqAndContinue sendToDlqAndContinue = this.getApplicationContext().getBean(SendToDlqAndContinue.class);
sendToDlqAndContinue.addKStreamDlqDispatch(inputTopic, kafkaStreamsDlqDispatch);
DeserializationExceptionHandler deserializationExceptionHandler = streamsConfig.defaultDeserializationExceptionHandler();
if (deserializationExceptionHandler instanceof SendToDlqAndContinue) {
((SendToDlqAndContinue) deserializationExceptionHandler).addKStreamDlqDispatch(inputTopic, kafkaStreamsDlqDispatch);
}
}
}
KafkaStreamsBinderUtils.prepareConsumerBinding(name, group, inputTarget,
getApplicationContext(),
kafkaTopicProvisioner,
kafkaStreamsBindingInformationCatalogue,
binderConfigurationProperties, properties);
return new DefaultBinding<>(name, group, inputTarget, null);
}
@@ -162,4 +133,14 @@ class KStreamBinder extends
public void setKafkaStreamsExtendedBindingProperties(KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
}
@Override
public String getDefaultsPrefix() {
return this.kafkaStreamsExtendedBindingProperties.getDefaultsPrefix();
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return this.kafkaStreamsExtendedBindingProperties.getExtendedPropertiesEntryClass();
}
}

View File

@@ -16,18 +16,20 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.beans.factory.config.MethodInvokingFactoryBean;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.type.AnnotationMetadata;
/**
* @author Marius Bogoevici
@@ -35,25 +37,45 @@ import org.springframework.context.annotation.Import;
* @author Soby Chacko
*/
@Configuration
@Import({KafkaAutoConfiguration.class})
@Import({KafkaAutoConfiguration.class, KStreamBinderConfiguration.KStreamMissingBeansRegistrar.class})
public class KStreamBinderConfiguration {
@Bean
@ConditionalOnBean(name = "outerContext")
public BeanFactoryPostProcessor outerContextBeanFactoryPostProcessor() {
return beanFactory -> {
ApplicationContext outerContext = (ApplicationContext) beanFactory.getBean("outerContext");
beanFactory.registerSingleton(KafkaStreamsBinderConfigurationProperties.class.getSimpleName(), outerContext
.getBean(KafkaStreamsBinderConfigurationProperties.class));
beanFactory.registerSingleton(KafkaStreamsMessageConversionDelegate.class.getSimpleName(), outerContext
.getBean(KafkaStreamsMessageConversionDelegate.class));
beanFactory.registerSingleton(KafkaStreamsBindingInformationCatalogue.class.getSimpleName(), outerContext
.getBean(KafkaStreamsBindingInformationCatalogue.class));
beanFactory.registerSingleton(KeyValueSerdeResolver.class.getSimpleName(), outerContext
.getBean(KeyValueSerdeResolver.class));
beanFactory.registerSingleton(KafkaStreamsExtendedBindingProperties.class.getSimpleName(), outerContext
.getBean(KafkaStreamsExtendedBindingProperties.class));
};
static class KStreamMissingBeansRegistrar extends KafkaStreamsBinderUtils.KafkaStreamsMissingBeansRegistrar {
private static final String BEAN_NAME = "outerContext";
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata,
BeanDefinitionRegistry registry) {
super.registerBeanDefinitions(importingClassMetadata, registry);
if (registry.containsBeanDefinition(BEAN_NAME)) {
AbstractBeanDefinition converstionDelegateBean = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingFactoryBean.class)
.addPropertyReference("targetObject", BEAN_NAME)
.addPropertyValue("targetMethod", "getBean")
.addPropertyValue("arguments", KafkaStreamsMessageConversionDelegate.class)
.getBeanDefinition();
registry.registerBeanDefinition(KafkaStreamsMessageConversionDelegate.class.getSimpleName(), converstionDelegateBean);
AbstractBeanDefinition keyValueSerdeResolverBean = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingFactoryBean.class)
.addPropertyReference("targetObject", BEAN_NAME)
.addPropertyValue("targetMethod", "getBean")
.addPropertyValue("arguments", KeyValueSerdeResolver.class)
.getBeanDefinition();
registry.registerBeanDefinition(KeyValueSerdeResolver.class.getSimpleName(), keyValueSerdeResolverBean);
AbstractBeanDefinition kafkaStreamsExtendedBindingPropertiesBean = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingFactoryBean.class)
.addPropertyReference("targetObject", BEAN_NAME)
.addPropertyValue("targetMethod", "getBean")
.addPropertyValue("arguments", KafkaStreamsExtendedBindingProperties.class)
.getBeanDefinition();
registry.registerBeanDefinition(KafkaStreamsExtendedBindingProperties.class.getSimpleName(), kafkaStreamsExtendedBindingPropertiesBean);
}
}
}
@Bean

View File

@@ -95,7 +95,7 @@ class KStreamBoundElementFactory extends AbstractBindingTargetFactory<KStream> {
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
if (methodInvocation.getMethod().getDeclaringClass().equals(KStream.class)) {
Assert.notNull(delegate, "Trying to invoke " + methodInvocation
Assert.notNull(delegate, "Trying to prepareConsumerBinding " + methodInvocation
.getMethod() + " but no delegate has been set.");
return methodInvocation.getMethod().invoke(delegate, methodInvocation.getArguments());
}

View File

@@ -16,17 +16,15 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
@@ -50,7 +48,7 @@ class KTableBinder extends
private final KafkaTopicProvisioner kafkaTopicProvisioner;
private final KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue;
private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties = new KafkaStreamsExtendedBindingProperties();
@@ -58,48 +56,21 @@ class KTableBinder extends
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) {
this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.KafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
}
@Override
@SuppressWarnings("unchecked")
protected Binding<KTable<Object, Object>> doBindConsumer(String name, String group, KTable<Object, Object> inputTarget,
ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties) {
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(
properties.getExtension());
if (binderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.sendToDlq) {
extendedConsumerProperties.getExtension().setEnableDlq(true);
}
if (!StringUtils.hasText(group)) {
group = binderConfigurationProperties.getApplicationId();
}
String[] inputTopics = StringUtils.commaDelimitedListToStringArray(name);
for (String inputTopic : inputTopics) {
this.kafkaTopicProvisioner.provisionConsumerDestination(inputTopic, group, extendedConsumerProperties);
}
if (extendedConsumerProperties.getExtension().isEnableDlq()) {
StreamsConfig streamsConfig = this.KafkaStreamsBindingInformationCatalogue.getStreamsConfig(inputTarget);
KafkaStreamsDlqDispatch kafkaStreamsDlqDispatch = !StringUtils.isEmpty(extendedConsumerProperties.getExtension().getDlqName()) ?
new KafkaStreamsDlqDispatch(extendedConsumerProperties.getExtension().getDlqName(), binderConfigurationProperties,
extendedConsumerProperties.getExtension()) : null;
for (String inputTopic : inputTopics) {
if (StringUtils.isEmpty(extendedConsumerProperties.getExtension().getDlqName())) {
String dlqName = "error." + inputTopic + "." + group;
kafkaStreamsDlqDispatch = new KafkaStreamsDlqDispatch(dlqName, binderConfigurationProperties,
extendedConsumerProperties.getExtension());
}
SendToDlqAndContinue sendToDlqAndContinue = this.getApplicationContext().getBean(SendToDlqAndContinue.class);
sendToDlqAndContinue.addKStreamDlqDispatch(inputTopic, kafkaStreamsDlqDispatch);
DeserializationExceptionHandler deserializationExceptionHandler = streamsConfig.defaultDeserializationExceptionHandler();
if (deserializationExceptionHandler instanceof SendToDlqAndContinue) {
((SendToDlqAndContinue) deserializationExceptionHandler).addKStreamDlqDispatch(inputTopic, kafkaStreamsDlqDispatch);
}
}
}
KafkaStreamsBinderUtils.prepareConsumerBinding(name, group, inputTarget,
getApplicationContext(),
kafkaTopicProvisioner,
kafkaStreamsBindingInformationCatalogue,
binderConfigurationProperties, properties);
return new DefaultBinding<>(name, group, inputTarget, null);
}
@@ -118,4 +89,14 @@ class KTableBinder extends
public KafkaStreamsProducerProperties getExtendedProducerProperties(String channelName) {
return this.kafkaStreamsExtendedBindingProperties.getExtendedProducerProperties(channelName);
}
@Override
public String getDefaultsPrefix() {
return this.kafkaStreamsExtendedBindingProperties.getDefaultsPrefix();
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return this.kafkaStreamsExtendedBindingProperties.getExtendedPropertiesEntryClass();
}
}

View File

@@ -25,11 +25,14 @@ import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStr
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
/**
* @author Soby Chacko
*/
@SuppressWarnings("ALL")
@Configuration
@Import(KafkaStreamsBinderUtils.KafkaStreamsMissingBeansRegistrar.class)
public class KTableBinderConfiguration {
@Bean

View File

@@ -78,7 +78,7 @@ class KTableBoundElementFactory extends AbstractBindingTargetFactory<KTable> {
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
if (methodInvocation.getMethod().getDeclaringClass().equals(KTable.class)) {
Assert.notNull(delegate, "Trying to invoke " + methodInvocation
Assert.notNull(delegate, "Trying to prepareConsumerBinding " + methodInvocation
.getMethod() + " but no delegate has been set.");
return methodInvocation.getMethod().invoke(delegate, methodInvocation.getArguments());
}
@@ -86,7 +86,7 @@ class KTableBoundElementFactory extends AbstractBindingTargetFactory<KTable> {
return methodInvocation.getMethod().invoke(this, methodInvocation.getArguments());
}
else {
throw new IllegalStateException("Only KStream method invocations are permitted");
throw new IllegalStateException("Only KTable method invocations are permitted");
}
}
}

View File

@@ -17,8 +17,9 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
@@ -40,8 +41,11 @@ import org.springframework.cloud.stream.config.BindingServiceConfiguration;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
* @author Marius Bogoevici
@@ -59,30 +63,66 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
return new KafkaStreamsBinderConfigurationProperties(kafkaProperties);
}
@Bean
public KafkaStreamsConfiguration kafkaStreamsConfiguration(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
Environment environment) {
KafkaProperties kafkaProperties = binderConfigurationProperties.getKafkaProperties();
Map<String, Object> streamsProperties = kafkaProperties.buildStreamsProperties();
if (kafkaProperties.getStreams().getApplicationId() == null) {
String applicationName = environment.getProperty("spring.application.name");
if (applicationName != null) {
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationName);
}
}
return new KafkaStreamsConfiguration(streamsProperties);
}
@Bean("streamConfigGlobalProperties")
public Map<String, Object> streamConfigGlobalProperties(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, binderConfigurationProperties.getKafkaConnectionString());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, binderConfigurationProperties.getApplicationId());
public Map<String, Object> streamConfigGlobalProperties(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaStreamsConfiguration kafkaStreamsConfiguration) {
Properties properties = kafkaStreamsConfiguration.asProperties();
// Override Spring Boot bootstrap server setting if left to default with the value
// configured in the binder
if (ObjectUtils.isEmpty(properties.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))) {
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, binderConfigurationProperties.getKafkaConnectionString());
}
else {
Object bootstrapServerConfig = properties.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
if (bootstrapServerConfig instanceof String) {
@SuppressWarnings("unchecked")
String bootStrapServers = (String) properties
.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
if (bootStrapServers.equals("localhost:9092")) {
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, binderConfigurationProperties.getKafkaConnectionString());
}
}
}
String binderProvidedApplicationId = binderConfigurationProperties.getApplicationId();
if (StringUtils.hasText(binderProvidedApplicationId)) {
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, binderProvidedApplicationId);
}
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
if (binderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndContinue) {
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
} else if (binderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndFail) {
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
} else if (binderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.sendToDlq) {
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
SendToDlqAndContinue.class);
}
if (!ObjectUtils.isEmpty(binderConfigurationProperties.getConfiguration())) {
props.putAll(binderConfigurationProperties.getConfiguration());
properties.putAll(binderConfigurationProperties.getConfiguration());
}
return props;
return properties.entrySet().stream().collect(
Collectors.toMap(e -> String.valueOf(e.getKey()), Map.Entry::getValue));
}
@Bean
@@ -133,6 +173,11 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
return new KTableBoundElementFactory(bindingServiceProperties);
}
@Bean
public GlobalKTableBoundElementFactory globalKTableBoundElementFactory(BindingServiceProperties bindingServiceProperties) {
return new GlobalKTableBoundElementFactory(bindingServiceProperties);
}
@Bean
public SendToDlqAndContinue sendToDlqAndContinue() {
return new SendToDlqAndContinue();

View File

@@ -0,0 +1,109 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.springframework.beans.factory.config.MethodInvokingFactoryBean;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.util.StringUtils;
/**
* @author Soby Chacko
*/
class KafkaStreamsBinderUtils {
static void prepareConsumerBinding(String name, String group, Object inputTarget,
ApplicationContext context,
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties) {
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(
properties.getExtension());
if (binderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.sendToDlq) {
extendedConsumerProperties.getExtension().setEnableDlq(true);
}
String[] inputTopics = StringUtils.commaDelimitedListToStringArray(name);
for (String inputTopic : inputTopics) {
kafkaTopicProvisioner.provisionConsumerDestination(inputTopic, group, extendedConsumerProperties);
}
if (extendedConsumerProperties.getExtension().isEnableDlq()) {
StreamsConfig streamsConfig = kafkaStreamsBindingInformationCatalogue.getStreamsConfig(inputTarget);
KafkaStreamsDlqDispatch kafkaStreamsDlqDispatch = !StringUtils.isEmpty(extendedConsumerProperties.getExtension().getDlqName()) ?
new KafkaStreamsDlqDispatch(extendedConsumerProperties.getExtension().getDlqName(), binderConfigurationProperties,
extendedConsumerProperties.getExtension()) : null;
for (String inputTopic : inputTopics) {
if (StringUtils.isEmpty(extendedConsumerProperties.getExtension().getDlqName())) {
String dlqName = "error." + inputTopic + "." + group;
kafkaStreamsDlqDispatch = new KafkaStreamsDlqDispatch(dlqName, binderConfigurationProperties,
extendedConsumerProperties.getExtension());
}
SendToDlqAndContinue sendToDlqAndContinue = context.getBean(SendToDlqAndContinue.class);
sendToDlqAndContinue.addKStreamDlqDispatch(inputTopic, kafkaStreamsDlqDispatch);
DeserializationExceptionHandler deserializationExceptionHandler = streamsConfig.defaultDeserializationExceptionHandler();
if (deserializationExceptionHandler instanceof SendToDlqAndContinue) {
((SendToDlqAndContinue) deserializationExceptionHandler).addKStreamDlqDispatch(inputTopic, kafkaStreamsDlqDispatch);
}
}
}
}
static class KafkaStreamsMissingBeansRegistrar implements ImportBeanDefinitionRegistrar {
private static final String BEAN_NAME = "outerContext";
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata,
BeanDefinitionRegistry registry) {
if (registry.containsBeanDefinition(BEAN_NAME)) {
AbstractBeanDefinition configBean = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingFactoryBean.class)
.addPropertyReference("targetObject", BEAN_NAME)
.addPropertyValue("targetMethod", "getBean")
.addPropertyValue("arguments", KafkaStreamsBinderConfigurationProperties.class)
.getBeanDefinition();
registry.registerBeanDefinition(KafkaStreamsBinderConfigurationProperties.class.getSimpleName(), configBean);
AbstractBeanDefinition catalogueBean = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingFactoryBean.class)
.addPropertyReference("targetObject", BEAN_NAME)
.addPropertyValue("targetMethod", "getBean")
.addPropertyValue("arguments", KafkaStreamsBindingInformationCatalogue.class)
.getBeanDefinition();
registry.registerBeanDefinition(KafkaStreamsBindingInformationCatalogue.class.getSimpleName(), catalogueBean);
}
}
}
}

View File

@@ -19,6 +19,8 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.Processor;
@@ -43,6 +45,8 @@ import org.springframework.util.StringUtils;
*/
public class KafkaStreamsMessageConversionDelegate {
private final static Log LOG = LogFactory.getLog(KafkaStreamsMessageConversionDelegate.class);
private static final ThreadLocal<KeyValue<Object, Object>> keyValueThreadLocal = new ThreadLocal<>();
private final CompositeMessageConverterFactory compositeMessageConverterFactory;
@@ -105,32 +109,34 @@ public class KafkaStreamsMessageConversionDelegate {
boolean isValidRecord = false;
try {
if (valueClass.isAssignableFrom(o2.getClass())) {
keyValueThreadLocal.set(new KeyValue<>(o, o2));
}
else if (o2 instanceof Message) {
if (valueClass.isAssignableFrom(((Message) o2).getPayload().getClass())) {
keyValueThreadLocal.set(new KeyValue<>(o, ((Message) o2).getPayload()));
//if the record is a tombstone, ignore and exit from processing further.
if (o2 != null) {
if (valueClass.isAssignableFrom(o2.getClass())) {
keyValueThreadLocal.set(new KeyValue<>(o, o2));
} else if (o2 instanceof Message) {
if (valueClass.isAssignableFrom(((Message) o2).getPayload().getClass())) {
keyValueThreadLocal.set(new KeyValue<>(o, ((Message) o2).getPayload()));
} else {
convertAndSetMessage(o, valueClass, messageConverter, (Message) o2);
}
} else if (o2 instanceof String || o2 instanceof byte[]) {
Message<?> message = MessageBuilder.withPayload(o2).build();
convertAndSetMessage(o, valueClass, messageConverter, message);
} else {
keyValueThreadLocal.set(new KeyValue<>(o, o2));
}
else {
convertAndSetMessage(o, valueClass, messageConverter, (Message) o2);
}
}
else if (o2 instanceof String || o2 instanceof byte[]) {
Message<?> message = MessageBuilder.withPayload(o2).build();
convertAndSetMessage(o, valueClass, messageConverter, message);
isValidRecord = true;
}
else {
keyValueThreadLocal.set(new KeyValue<>(o, o2));
LOG.info("Received a tombstone record. This will be skipped from further processing.");
}
isValidRecord = true;
}
catch (Exception ignored) {
//pass through
}
return isValidRecord;
},
//sedond filter that catches any messages for which an exception thrown in the first filter above.
//second filter that catches any messages for which an exception thrown in the first filter above.
(k, v) -> true
);
//process errors from the second filter in the branch above.
@@ -164,21 +170,21 @@ public class KafkaStreamsMessageConversionDelegate {
@Override
public void process(Object o, Object o2) {
if (kstreamBindingInformationCatalogue.isDlqEnabled(bindingTarget)) {
String destination = context.topic();
if (o2 instanceof Message) {
Message message = (Message) o2;
sendToDlqAndContinue.sendToDlq(destination, (byte[]) o, (byte[]) message.getPayload(), context.partition());
//Only continue if the record was not a tombstone.
if (o2 != null) {
if (kstreamBindingInformationCatalogue.isDlqEnabled(bindingTarget)) {
String destination = context.topic();
if (o2 instanceof Message) {
Message message = (Message) o2;
sendToDlqAndContinue.sendToDlq(destination, (byte[]) o, (byte[]) message.getPayload(), context.partition());
} else {
sendToDlqAndContinue.sendToDlq(destination, (byte[]) o, (byte[]) o2, context.partition());
}
} else if (kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndFail) {
throw new IllegalStateException("Inbound deserialization failed.");
} else if (kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndContinue) {
//quietly pass through. No action needed, this is similar to log and continue.
}
else {
sendToDlqAndContinue.sendToDlq(destination, (byte[]) o, (byte[]) o2, context.partition());
}
}
else if (kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndFail) {
throw new IllegalStateException("Inbound deserialization failed.");
}
else if (kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndContinue) {
//quietly pass through. No action needed, this is similar to log and continue.
}
}

View File

@@ -30,6 +30,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -37,14 +38,19 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.PropertySourcesPlaceholdersResolver;
import org.springframework.boot.context.properties.source.ConfigurationPropertySources;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsStateStore;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
@@ -57,14 +63,15 @@ import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
import org.springframework.cloud.stream.binding.StreamListenerSetupMethodOrchestrator;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.config.MergableProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
@@ -80,7 +87,7 @@ import org.springframework.util.StringUtils;
* The orchestration primarily focus on the following areas:
*
* 1. Allow multiple KStream output bindings (KStream branching) by allowing more than one output values on {@link SendTo}
* 2. Allow multiple inbound bindings for multiple KStream and or KTable types.
* 2. Allow multiple inbound bindings for multiple KStream and or KTable/GlobalKTable types.
* 3. Each StreamListener method that it orchestrates gets its own {@link StreamsBuilderFactoryBean} and {@link StreamsConfig}
*
* @author Soby Chacko
@@ -112,13 +119,13 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
private ConfigurableApplicationContext applicationContext;
KafkaStreamsStreamListenerSetupMethodOrchestrator(BindingServiceProperties bindingServiceProperties,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
StreamListenerParameterAdapter streamListenerParameterAdapter,
Collection<StreamListenerResultAdapter> streamListenerResultAdapters,
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
CleanupConfig cleanupConfig) {
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
StreamListenerParameterAdapter streamListenerParameterAdapter,
Collection<StreamListenerResultAdapter> streamListenerResultAdapters,
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
CleanupConfig cleanupConfig) {
this.bindingServiceProperties = bindingServiceProperties;
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
this.keyValueSerdeResolver = keyValueSerdeResolver;
@@ -149,7 +156,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
for (int i = 0; i < method.getParameterCount(); i++) {
MethodParameter methodParameter = MethodParameter.forExecutable(method, i);
Class<?> parameterType = methodParameter.getParameterType();
if (parameterType.equals(KStream.class) || parameterType.equals(KTable.class)) {
if (parameterType.equals(KStream.class) || parameterType.equals(KTable.class)
|| parameterType.equals(GlobalKTable.class)) {
supports = true;
}
}
@@ -235,7 +243,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
//Retrieve the StreamsConfig created for this method if available.
//Otherwise, create the StreamsBuilderFactory and get the underlying config.
if (!methodStreamsBuilderFactoryBeanMap.containsKey(method)) {
streamsConfig = buildStreamsBuilderAndRetrieveConfig(method, applicationContext, bindingProperties);
streamsConfig = buildStreamsBuilderAndRetrieveConfig(method, applicationContext, inboundName);
}
try {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = methodStreamsBuilderFactoryBeanMap.get(method);
@@ -282,6 +290,22 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
}
arguments[parameterIndex] = table;
}
else if (parameterType.isAssignableFrom(GlobalKTable.class)) {
String materializedAs = extendedConsumerProperties.getMaterializedAs();
String bindingDestination = bindingServiceProperties.getBindingDestination(inboundName);
GlobalKTable<?, ?> table = materializedAs != null ?
materializedAsGlobalKTable(streamsBuilder, bindingDestination, materializedAs, keySerde, valueSerde ) :
streamsBuilder.globalTable(bindingDestination,
Consumed.with(keySerde, valueSerde));
GlobalKTableBoundElementFactory.GlobalKTableWrapper globalKTableWrapper = (GlobalKTableBoundElementFactory.GlobalKTableWrapper) targetBean;
//wrap the proxy created during the initial target type binding with real object (KTable)
globalKTableWrapper.wrap((GlobalKTable<Object, Object>) table);
kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
if (streamsConfig != null){
kafkaStreamsBindingInformationCatalogue.addStreamsConfigs(globalKTableWrapper, streamsConfig);
}
arguments[parameterIndex] = table;
}
}
catch (Exception e) {
throw new IllegalStateException(e);
@@ -295,11 +319,19 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
}
private <K,V> KTable<K,V> materializedAs(StreamsBuilder streamsBuilder, String destination, String storeName, Serde<K> k, Serde<V> v) {
return streamsBuilder.table(bindingServiceProperties.getBindingDestination(destination),
Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(storeName)
.withKeySerde(k)
.withValueSerde(v));
getMaterialized(storeName, k, v));
}
private <K,V> GlobalKTable<K,V> materializedAsGlobalKTable(StreamsBuilder streamsBuilder, String destination, String storeName, Serde<K> k, Serde<V> v) {
return streamsBuilder.globalTable(bindingServiceProperties.getBindingDestination(destination),
getMaterialized(storeName, k, v));
}
private <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> getMaterialized(String storeName, Serde<K> k, Serde<V> v) {
return Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(storeName)
.withKeySerde(k)
.withValueSerde(v);
}
private StoreBuilder buildStateStore(KafkaStreamsStateStoreProperties spec) {
@@ -337,7 +369,6 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
}
}
private KStream<?, ?> getkStream(String inboundName, KafkaStreamsStateStoreProperties storeSpec,
BindingProperties bindingProperties, StreamsBuilder streamsBuilder,
Serde<?> keySerde, Serde<?> valueSerde) {
@@ -363,11 +394,11 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
stream = stream.mapValues(value -> {
Object returnValue;
String contentType = bindingProperties.getContentType();
if (!StringUtils.isEmpty(contentType) && !nativeDecoding) {
Message<?> message = MessageBuilder.withPayload(value)
if (value != null && !StringUtils.isEmpty(contentType) && !nativeDecoding) {
returnValue = MessageBuilder.withPayload(value)
.setHeader(MessageHeaders.CONTENT_TYPE, contentType).build();
returnValue = message;
} else {
}
else {
returnValue = value;
}
return returnValue;
@@ -376,25 +407,33 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
}
private void enableNativeDecodingForKTableAlways(Class<?> parameterType, BindingProperties bindingProperties) {
if (parameterType.isAssignableFrom(KTable.class)) {
if (parameterType.isAssignableFrom(KTable.class) || parameterType.isAssignableFrom(GlobalKTable.class)) {
if (bindingProperties.getConsumer() == null) {
bindingProperties.setConsumer(new ConsumerProperties());
}
//No framework level message conversion provided for KTable, its done by the broker.
//No framework level message conversion provided for KTable/GlobalKTable, its done by the broker.
bindingProperties.getConsumer().setUseNativeDecoding(true);
}
}
@SuppressWarnings({"unchecked"})
private StreamsConfig buildStreamsBuilderAndRetrieveConfig(Method method, ApplicationContext applicationContext,
BindingProperties bindingProperties) {
String inboundName) {
ConfigurableListableBeanFactory beanFactory = this.applicationContext.getBeanFactory();
String group = bindingProperties.getGroup();
if (!StringUtils.hasText(group)) {
group = binderConfigurationProperties.getApplicationId();
}
Map<String, Object> streamConfigGlobalProperties = applicationContext.getBean("streamConfigGlobalProperties", Map.class);
streamConfigGlobalProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, group);
KafkaStreamsConsumerProperties extendedConsumerProperties = kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(inboundName);
//Need to apply the default extended properties here as it is not yet done by the BindingService in the binding lifecycle.
handleExtendedDefaultProperties(kafkaStreamsExtendedBindingProperties,
extendedConsumerProperties);
String applicationId = extendedConsumerProperties.getApplicationId();
//override application.id if set at the individual binding level.
if (StringUtils.hasText(applicationId)) {
streamConfigGlobalProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
}
//Custom StreamsConfig implementation that overrides to guarantee that the deserialization handler is cached.
StreamsConfig streamsConfig = new StreamsConfig(streamConfigGlobalProperties) {
@@ -421,7 +460,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
streamsBuilder.setAutoStartup(false);
BeanDefinition streamsBuilderBeanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<StreamsBuilderFactoryBean>) streamsBuilder.getClass(), () -> streamsBuilder)
.getRawBeanDefinition();
.getRawBeanDefinition();
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("stream-builder-" + method.getName(), streamsBuilderBeanDefinition);
StreamsBuilderFactoryBean streamsBuilderX = applicationContext.getBean("&stream-builder-" + method.getName(), StreamsBuilderFactoryBean.class);
BeanDefinition streamsConfigBeanDefinition =
@@ -433,6 +472,28 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
return streamsConfig;
}
// This method is mostly copied from core. We should refactor the original method in core so that it is publicly available
// as a utility method. This is currently hidden as a private method in BindingService.
private void handleExtendedDefaultProperties(KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
MergableProperties extendedProperties) {
String defaultsPrefix = kafkaStreamsExtendedBindingProperties.getDefaultsPrefix();
if (defaultsPrefix != null) {
Class<? extends BinderSpecificPropertiesProvider> extendedPropertiesEntryClass = kafkaStreamsExtendedBindingProperties.getExtendedPropertiesEntryClass();
if (BinderSpecificPropertiesProvider.class.isAssignableFrom(extendedPropertiesEntryClass)) {
org.springframework.boot.context.properties.bind.Binder extendedPropertiesResolverBinder =
new org.springframework.boot.context.properties.bind.Binder(ConfigurationPropertySources.get(applicationContext.getEnvironment()),
new PropertySourcesPlaceholdersResolver(applicationContext.getEnvironment()),
IntegrationUtils.getConversionService(this.applicationContext.getBeanFactory()), null);
BinderSpecificPropertiesProvider defaultProperties = BeanUtils.instantiateClass(extendedPropertiesEntryClass);
extendedPropertiesResolverBinder.bind(defaultsPrefix, Bindable.ofInstance(defaultProperties));
Object binderExtendedProperties = defaultProperties.getConsumer();
((MergableProperties)binderExtendedProperties).merge(extendedProperties);
}
}
}
@Override
public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;

View File

@@ -35,7 +35,7 @@ public class KafkaStreamsBinderConfigurationProperties extends KafkaBinderConfig
sendToDlq
}
private String applicationId = "default";
private String applicationId;
public String getApplicationId() {
return applicationId;

View File

@@ -16,10 +16,12 @@
package org.springframework.cloud.stream.binder.kafka.streams.properties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
/**
* @author Marius Bogoevici
*/
public class KafkaStreamsBindingProperties {
public class KafkaStreamsBindingProperties implements BinderSpecificPropertiesProvider {
private KafkaStreamsConsumerProperties consumer = new KafkaStreamsConsumerProperties();

View File

@@ -24,6 +24,8 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerPro
*/
public class KafkaStreamsConsumerProperties extends KafkaConsumerProperties {
private String applicationId;
/**
* Key serde specified per binding.
*/
@@ -39,6 +41,14 @@ public class KafkaStreamsConsumerProperties extends KafkaConsumerProperties {
*/
private String materializedAs;
public String getApplicationId() {
return applicationId;
}
public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}
public String getKeySerde() {
return keySerde;
}

View File

@@ -20,6 +20,7 @@ import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
/**
@@ -29,6 +30,8 @@ import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
public class KafkaStreamsExtendedBindingProperties
implements ExtendedBindingProperties<KafkaStreamsConsumerProperties, KafkaStreamsProducerProperties> {
private static final String DEFAULTS_PREFIX = "spring.cloud.stream.kafka.streams.default";
private Map<String, KafkaStreamsBindingProperties> bindings = new HashMap<>();
public Map<String, KafkaStreamsBindingProperties> getBindings() {
@@ -58,4 +61,14 @@ public class KafkaStreamsExtendedBindingProperties
return new KafkaStreamsProducerProperties();
}
}
@Override
public String getDefaultsPrefix() {
return DEFAULTS_PREFIX;
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return KafkaStreamsBindingProperties.class;
}
}

View File

@@ -2,5 +2,7 @@ kstream:\
org.springframework.cloud.stream.binder.kafka.streams.KStreamBinderConfiguration
ktable:\
org.springframework.cloud.stream.binder.kafka.streams.KTableBinderConfiguration
globalktable:\
org.springframework.cloud.stream.binder.kafka.streams.GlobalKTableBinderConfiguration

View File

@@ -47,7 +47,8 @@ import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.test.annotation.DirtiesContext;
@@ -68,9 +69,11 @@ import static org.mockito.Mockito.verify;
public abstract class DeserializationErrorHandlerByKafkaTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts", "error.words.group",
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "counts", "error.words.group",
"error.word1.groupx", "error.word2.groupx");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
@SpyBean
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
@@ -99,6 +102,7 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
@SpringBootTest(properties = {
"spring.cloud.stream.bindings.input.consumer.useNativeDecoding=true",
"spring.cloud.stream.bindings.output.producer.useNativeEncoding=true",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=deser-kafka-dlq",
"spring.cloud.stream.bindings.input.group=group",
"spring.cloud.stream.kafka.streams.binder.serdeError=sendToDlq",
"spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=" +
@@ -135,6 +139,7 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
"spring.cloud.stream.bindings.input.consumer.useNativeDecoding=true",
"spring.cloud.stream.bindings.output.producer.useNativeEncoding=true",
"spring.cloud.stream.bindings.input.destination=word1,word2",
"spring.cloud.stream.kafka.streams.default.consumer.applicationId=deser-kafka-dlq-multi-input",
"spring.cloud.stream.bindings.input.group=groupx",
"spring.cloud.stream.kafka.streams.binder.serdeError=sendToDlq",
"spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=" +

View File

@@ -42,7 +42,8 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.test.annotation.DirtiesContext;
@@ -62,9 +63,11 @@ import static org.mockito.Mockito.verify;
public abstract class DeserializtionErrorHandlerByBinderTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts-id", "error.foos.foobar-group",
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "counts-id", "error.foos.foobar-group",
"error.foos1.fooz-group", "error.foos2.fooz-group");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
@SpyBean
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
@@ -101,6 +104,7 @@ public abstract class DeserializtionErrorHandlerByBinderTests {
"spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
"spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"spring.cloud.stream.kafka.streams.binder.serdeError=sendToDlq",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=deserializationByBinderAndDlqTests",
"spring.cloud.stream.bindings.input.group=foobar-group"},
webEnvironment= SpringBootTest.WebEnvironment.NONE
)
@@ -135,10 +139,9 @@ public abstract class DeserializtionErrorHandlerByBinderTests {
"spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"spring.cloud.stream.bindings.output.producer.headerMode=raw",
"spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
"spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"spring.cloud.stream.kafka.streams.binder.serdeError=sendToDlq",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=deserializationByBinderAndDlqTestsWithMultipleInputs",
"spring.cloud.stream.bindings.input.group=fooz-group"},
webEnvironment= SpringBootTest.WebEnvironment.NONE
)

View File

@@ -47,7 +47,8 @@ import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
@@ -67,7 +68,9 @@ import static org.assertj.core.api.Assertions.assertThat;
public class KafkaStreamsBinderMultipleInputTopicsTest {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts");
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "counts");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
private static Consumer<String, String> consumer;
@@ -98,10 +101,10 @@ public class KafkaStreamsBinderMultipleInputTopicsTest {
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId=WordCountProcessorApplication-xyz",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {

View File

@@ -43,7 +43,8 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
@@ -56,7 +57,9 @@ import static org.assertj.core.api.Assertions.assertThat;
public class KafkaStreamsBinderPojoInputAndPrimitiveTypeOutputTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts-id");
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "counts-id");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
private static Consumer<Integer, String> consumer;
@@ -86,9 +89,8 @@ public class KafkaStreamsBinderPojoInputAndPrimitiveTypeOutputTests {
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId=KafkaStreamsBinderPojoInputAndPrimitiveTypeOutputTests-xyz",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
@@ -23,6 +24,7 @@ import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
@@ -55,7 +57,8 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
@@ -69,7 +72,9 @@ import static org.assertj.core.api.Assertions.assertThat;
public class KafkaStreamsBinderWordCountIntegrationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts");
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "counts");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
private static Consumer<String, String> consumer;
@@ -88,15 +93,16 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
}
@Test
public void testKstreamWordCountWithStringInputAndPojoOuput() throws Exception {
public void testKstreamWordCountWithApplicationIdSpecifiedAtDefaultConsumer() throws Exception {
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
try (ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=words",
"--spring.cloud.stream.bindings.output.destination=counts",
"--spring.cloud.stream.bindings.output.contentType=application/json",
"--spring.cloud.stream.kafka.streams.default.consumer.application-id=basic-word-count",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
@@ -105,32 +111,75 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString())) {
receiveAndValidate(context);
}
}
@Test
public void testKstreamWordCountWithInputBindingLevelApplicationId() throws Exception {
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=words",
"--spring.cloud.stream.bindings.output.destination=counts",
"--spring.cloud.stream.bindings.output.contentType=application/json",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=basic-word-count",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString())) {
receiveAndValidate(context);
//Assertions on StreamBuilderFactoryBean
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
ReadOnlyWindowStore<Object, Object> store = kafkaStreams.store("foo-WordCounts", QueryableStoreTypes.windowStore());
assertThat(store).isNotNull();
sendTombStoneRecordsAndVerifyGracefulHandling();
CleanupConfig cleanup = TestUtils.getPropertyValue(streamsBuilderFactoryBean, "cleanupConfig",
CleanupConfig.class);
assertThat(cleanup.cleanupOnStart()).isTrue();
assertThat(cleanup.cleanupOnStop()).isFalse();
}
finally {
context.close();
}
}
private void receiveAndValidate(ConfigurableApplicationContext context) throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("words");
template.sendDefault("foobar");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "counts");
assertThat(cr.value().contains("\"word\":\"foobar\",\"count\":1")).isTrue();
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("words");
template.sendDefault("foobar");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "counts");
assertThat(cr.value().contains("\"word\":\"foobar\",\"count\":1")).isTrue();
}
finally {
pf.destroy();
}
}
private void sendTombStoneRecordsAndVerifyGracefulHandling() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("words");
template.sendDefault(null);
ConsumerRecords<String, String> received = consumer.poll(Duration.ofMillis(5000));
//By asserting that the received record is empty, we are ensuring that the tombstone record
//was handled by the binder gracefully.
assertThat(received.isEmpty()).isTrue();
}
finally {
pf.destroy();
}
}
@EnableBinding(KafkaStreamsProcessor.class)
@@ -145,9 +194,6 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
@SendTo("output")
public KStream<?, WordCount> process(@Input("input") KStream<Object, String> input) {
input.map((k,v) -> {
return new KeyValue<>(k,v);
});
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))

View File

@@ -48,7 +48,8 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
@@ -61,7 +62,9 @@ import static org.assertj.core.api.Assertions.assertThat;
public class KafkaStreamsInteractiveQueryIntegrationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts-id");
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "counts-id");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
private static Consumer<String, String> consumer;
@@ -90,8 +93,7 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId=ProductCountApplication-abc",
"--spring.cloud.stream.kafka.streams.binder.configuration.application.server=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());

View File

@@ -47,7 +47,8 @@ import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.test.annotation.DirtiesContext;
@@ -68,7 +69,9 @@ import static org.mockito.Mockito.verify;
public abstract class KafkaStreamsNativeEncodingDecodingTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts");
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "counts");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
@SpyBean
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
@@ -97,7 +100,9 @@ public abstract class KafkaStreamsNativeEncodingDecodingTests {
@SpringBootTest(properties = {
"spring.cloud.stream.bindings.input.consumer.useNativeDecoding=true",
"spring.cloud.stream.bindings.output.producer.useNativeEncoding=true"},
"spring.cloud.stream.bindings.output.producer.useNativeEncoding=true",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId=NativeEncodingDecodingEnabledTests-abc"
},
webEnvironment= SpringBootTest.WebEnvironment.NONE
)
public static class NativeEncodingDecodingEnabledTests extends KafkaStreamsNativeEncodingDecodingTests {
@@ -117,7 +122,8 @@ public abstract class KafkaStreamsNativeEncodingDecodingTests {
}
}
@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.NONE)
@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.NONE,
properties = "spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId=NativeEncodingDecodingEnabledTests-xyz")
public static class NativeEncodingDecodingDisabledTests extends KafkaStreamsNativeEncodingDecodingTests {
@Test

View File

@@ -37,7 +37,8 @@ import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStr
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
@@ -49,7 +50,9 @@ import static org.assertj.core.api.Assertions.assertThat;
public class KafkaStreamsStateStoreIntegrationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts-id");
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "counts-id");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
@Test
public void testKstreamStateStore() throws Exception {
@@ -61,7 +64,7 @@ public class KafkaStreamsStateStoreIntegrationTests {
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId=KafkaStreamsStateStoreIntegrationTests-abc",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {

View File

@@ -45,7 +45,8 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
@@ -59,7 +60,9 @@ import static org.assertj.core.api.Assertions.assertThat;
public class KafkastreamsBinderPojoInputStringOutputIntegrationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts-id");
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "counts-id");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
private static Consumer<String, String> consumer;
@@ -88,9 +91,8 @@ public class KafkastreamsBinderPojoInputStringOutputIntegrationTests {
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId=ProductCountApplication-xyz",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {

View File

@@ -0,0 +1,322 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
*/
public class StreamToGlobalKTableJoinIntegrationTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "enriched-order");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
private static Consumer<Long, EnrichedOrder> consumer;
interface CustomGlobalKTableProcessor extends KafkaStreamsProcessor {
@Input("inputX")
GlobalKTable<?, ?> inputX();
@Input("inputY")
GlobalKTable<?, ?> inputY();
}
@EnableBinding(CustomGlobalKTableProcessor.class)
@EnableAutoConfiguration
@EnableConfigurationProperties(KafkaStreamsApplicationSupportProperties.class)
public static class OrderEnricherApplication {
@StreamListener
@SendTo("output")
public KStream<Long, EnrichedOrder> process(@Input("input") KStream<Long, Order> ordersStream,
@Input("inputX") GlobalKTable<Long, Customer> customers,
@Input("inputY") GlobalKTable<Long, Product> products) {
KStream<Long, CustomerOrder> customerOrdersStream = ordersStream.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order));
return customerOrdersStream.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
});
}
}
@Test
public void testStreamToGlobalKTable() throws Exception {
SpringApplication app = new SpringApplication(StreamToGlobalKTableJoinIntegrationTests.OrderEnricherApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=orders",
"--spring.cloud.stream.bindings.inputX.destination=customers",
"--spring.cloud.stream.bindings.inputY.destination=products",
"--spring.cloud.stream.bindings.output.destination=enriched-order",
"--spring.cloud.stream.bindings.input.consumer.useNativeDecoding=true",
"--spring.cloud.stream.bindings.inputX.consumer.useNativeDecoding=true",
"--spring.cloud.stream.bindings.inputY.consumer.useNativeDecoding=true",
"--spring.cloud.stream.bindings.output.producer.useNativeEncoding=true",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$LongSerde",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde=org.springframework.cloud.stream.binder.kafka.streams.integration.StreamToGlobalKTableJoinIntegrationTests$OrderSerde",
"--spring.cloud.stream.kafka.streams.bindings.inputX.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$LongSerde",
"--spring.cloud.stream.kafka.streams.bindings.inputX.consumer.valueSerde=org.springframework.cloud.stream.binder.kafka.streams.integration.StreamToGlobalKTableJoinIntegrationTests$CustomerSerde",
"--spring.cloud.stream.kafka.streams.bindings.inputY.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$LongSerde",
"--spring.cloud.stream.kafka.streams.bindings.inputY.consumer.valueSerde=org.springframework.cloud.stream.binder.kafka.streams.integration.StreamToGlobalKTableJoinIntegrationTests$ProductSerde",
"--spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$LongSerde",
"--spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde=org.springframework.cloud.stream.binder.kafka.streams.integration.StreamToGlobalKTableJoinIntegrationTests$EnrichedOrderSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=10000",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId=StreamToGlobalKTableJoinIntegrationTests-abc",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString())) {
Map<String, Object> senderPropsCustomer = KafkaTestUtils.producerProps(embeddedKafka);
senderPropsCustomer.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
CustomerSerde customerSerde = new CustomerSerde();
senderPropsCustomer.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, customerSerde.serializer().getClass());
DefaultKafkaProducerFactory<Long, Customer> pfCustomer = new DefaultKafkaProducerFactory<>(senderPropsCustomer);
KafkaTemplate<Long, Customer> template = new KafkaTemplate<>(pfCustomer, true);
template.setDefaultTopic("customers");
for (long i = 0; i < 5; i++) {
final Customer customer = new Customer();
customer.setName("customer-" + i);
template.sendDefault(i, customer);
}
Map<String, Object> senderPropsProduct = KafkaTestUtils.producerProps(embeddedKafka);
senderPropsProduct.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
ProductSerde productSerde = new ProductSerde();
senderPropsProduct.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, productSerde.serializer().getClass());
DefaultKafkaProducerFactory<Long, Product> pfProduct = new DefaultKafkaProducerFactory<>(senderPropsProduct);
KafkaTemplate<Long, Product> productTemplate = new KafkaTemplate<>(pfProduct, true);
productTemplate.setDefaultTopic("products");
for (long i = 0; i < 5; i++) {
final Product product = new Product();
product.setName("product-" + i);
productTemplate.sendDefault(i, product);
}
Map<String, Object> senderPropsOrder = KafkaTestUtils.producerProps(embeddedKafka);
senderPropsOrder.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
OrderSerde orderSerde = new OrderSerde();
senderPropsOrder.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, orderSerde.serializer().getClass());
DefaultKafkaProducerFactory<Long, Order> pfOrder = new DefaultKafkaProducerFactory<>(senderPropsOrder);
KafkaTemplate<Long, Order> orderTemplate = new KafkaTemplate<>(pfOrder, true);
orderTemplate.setDefaultTopic("orders");
for (long i = 0; i < 5; i++) {
final Order order = new Order();
order.setCustomerId(i);
order.setProductId(i);
orderTemplate.sendDefault(i, order);
}
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
EnrichedOrderSerde enrichedOrderSerde = new EnrichedOrderSerde();
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, enrichedOrderSerde.deserializer().getClass());
consumerProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "org.springframework.cloud.stream.binder.kafka.streams.integration.StreamToGlobalKTableJoinIntegrationTests.EnrichedOrder");
DefaultKafkaConsumerFactory<Long, EnrichedOrder> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "enriched-order");
int count = 0;
long start = System.currentTimeMillis();
List<KeyValue<Long, EnrichedOrder>> enrichedOrders = new ArrayList<>();
do {
ConsumerRecords<Long, EnrichedOrder> records = KafkaTestUtils.getRecords(consumer);
count = count + records.count();
for (ConsumerRecord<Long, EnrichedOrder> record : records) {
enrichedOrders.add(new KeyValue<>(record.key(), record.value()));
}
} while (count < 5 && (System.currentTimeMillis() - start) < 30000);
assertThat(count == 5).isTrue();
assertThat(enrichedOrders.size() == 5).isTrue();
enrichedOrders.sort(Comparator.comparing(o -> o.key));
for (int i = 0; i < 5; i++) {
KeyValue<Long, EnrichedOrder> enrichedOrderKeyValue = enrichedOrders.get(i);
assertThat(enrichedOrderKeyValue.key == i).isTrue();
EnrichedOrder enrichedOrder = enrichedOrderKeyValue.value;
assertThat(enrichedOrder.getOrder().customerId == i).isTrue();
assertThat(enrichedOrder.getOrder().productId == i).isTrue();
assertThat(enrichedOrder.getCustomer().name.equals("customer-" + i)).isTrue();
assertThat(enrichedOrder.getProduct().name.equals("product-" + i)).isTrue();
}
pfCustomer.destroy();
pfProduct.destroy();
pfOrder.destroy();
consumer.close();
}
}
static class Order {
long customerId;
long productId;
public long getCustomerId() {
return customerId;
}
public void setCustomerId(long customerId) {
this.customerId = customerId;
}
public long getProductId() {
return productId;
}
public void setProductId(long productId) {
this.productId = productId;
}
}
static class Customer {
String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
static class Product {
String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
static class EnrichedOrder {
Product product;
Customer customer;
Order order;
public Product getProduct() {
return product;
}
public void setProduct(Product product) {
this.product = product;
}
public Customer getCustomer() {
return customer;
}
public void setCustomer(Customer customer) {
this.customer = customer;
}
public Order getOrder() {
return order;
}
public void setOrder(Order order) {
this.order = order;
}
}
private static class CustomerOrder {
private final Customer customer;
private final Order order;
CustomerOrder(final Customer customer, final Order order) {
this.customer = customer;
this.order = order;
}
long productId() {
return order.getProductId();
}
}
public static class OrderSerde extends JsonSerde<Order> {}
public static class CustomerSerde extends JsonSerde<Customer> {}
public static class ProductSerde extends JsonSerde<Product> {}
public static class EnrichedOrderSerde extends JsonSerde<EnrichedOrder> {}
}

View File

@@ -54,7 +54,8 @@ import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
@@ -66,7 +67,9 @@ import static org.assertj.core.api.Assertions.assertThat;
public class StreamToTableJoinIntegrationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "output-topic");
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "output-topic");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
private static Consumer<String, Long> consumer;
@@ -113,11 +116,11 @@ public class StreamToTableJoinIntegrationTests {
}
@Test
public void testStreamToTable() throws Exception {
public void testStreamToTable() {
SpringApplication app = new SpringApplication(CountClicksPerRegionApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=user-clicks",
"--spring.cloud.stream.bindings.inputX.destination=user-regions",
@@ -134,12 +137,9 @@ public class StreamToTableJoinIntegrationTests {
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=10000",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.bindings.inputX.consumer.headerMode=raw",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId=StreamToTableJoinIntegrationTests-abc",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString())) {
// Input 1: Clicks per user (multiple records allowed per user).
List<KeyValue<String, Long>> userClicks = Arrays.asList(
new KeyValue<>("alice", 13L),
@@ -160,7 +160,7 @@ public class StreamToTableJoinIntegrationTests {
KafkaTemplate<String, Long> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("user-clicks");
for (KeyValue<String,Long> keyValue : userClicks) {
for (KeyValue<String, Long> keyValue : userClicks) {
template.sendDefault(keyValue.key, keyValue.value);
}
@@ -183,7 +183,7 @@ public class StreamToTableJoinIntegrationTests {
KafkaTemplate<String, String> template1 = new KafkaTemplate<>(pf1, true);
template1.setDefaultTopic("user-regions");
for (KeyValue<String,String> keyValue : userRegions) {
for (KeyValue<String, String> keyValue : userRegions) {
template1.sendDefault(keyValue.key, keyValue.value);
}
@@ -203,17 +203,11 @@ public class StreamToTableJoinIntegrationTests {
for (ConsumerRecord<String, Long> record : records) {
actualClicksPerRegion.add(new KeyValue<>(record.key(), record.value()));
}
} while (count < expectedClicksPerRegion.size() && (System.currentTimeMillis() - start) < 30000 );
} while (count < expectedClicksPerRegion.size() && (System.currentTimeMillis() - start) < 30000);
assertThat(count == expectedClicksPerRegion.size()).isTrue();
assertThat(actualClicksPerRegion).hasSameElementsAs(expectedClicksPerRegion);
}
catch (Exception e){
System.out.println(e);
}
finally {
context.close();
}
}
/**

View File

@@ -47,7 +47,8 @@ import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
@@ -61,7 +62,9 @@ import static org.assertj.core.api.Assertions.assertThat;
public class WordCountMultipleBranchesIntegrationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts","foo","bar");
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "counts","foo","bar");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
private static Consumer<String, String> consumer;
@@ -139,10 +142,9 @@ public class WordCountMultipleBranchesIntegrationTests {
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId=WordCountMultipleBranchesIntegrationTests-abc",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {

View File

@@ -4,7 +4,5 @@ spring.cloud.stream.bindings.output.contentType=application/json
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.kafka.streams.timeWindow.length=5000
spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.0.M2</version>
<version>2.1.0.M3</version>
</parent>
<dependencies>

View File

@@ -55,6 +55,7 @@ import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.DefaultPollableMessageSource;
import org.springframework.cloud.stream.binder.EmbeddedHeaderUtils;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
@@ -167,9 +168,9 @@ public class KafkaMessageChannelBinder extends
super(headersToMap(configurationProperties), provisioningProvider, containerCustomizer);
this.configurationProperties = configurationProperties;
if (StringUtils.hasText(configurationProperties.getTransaction().getTransactionIdPrefix())) {
this.transactionManager = new KafkaTransactionManager<>(
getProducerFactory(configurationProperties.getTransaction().getTransactionIdPrefix(),
new ExtendedProducerProperties<>(configurationProperties.getTransaction().getProducer())));
this.transactionManager = new KafkaTransactionManager<>(getProducerFactory(
configurationProperties.getTransaction().getTransactionIdPrefix(), new ExtendedProducerProperties<>(
configurationProperties.getTransaction().getProducer().getExtension())));
}
else {
this.transactionManager = null;
@@ -214,6 +215,16 @@ public class KafkaMessageChannelBinder extends
return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
}
@Override
public String getDefaultsPrefix() {
return this.extendedBindingProperties.getDefaultsPrefix();
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
}
@Override
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties, MessageChannel errorChannel)
@@ -339,6 +350,14 @@ public class KafkaMessageChannelBinder extends
return producerFactory;
}
@Override
protected boolean useNativeEncoding(ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
if (this.transactionManager != null) {
return this.configurationProperties.getTransaction().getProducer().isUseNativeEncoding();
}
return super.useNativeEncoding(producerProperties);
}
@Override
@SuppressWarnings("unchecked")
protected MessageProducer createConsumerEndpoint(final ConsumerDestination destination, final String group,
@@ -657,6 +676,7 @@ public class KafkaMessageChannelBinder extends
return new RawRecordHeaderErrorMessageStrategy();
}
@SuppressWarnings("unchecked")
@Override
protected MessageHandler getErrorMessageHandler(final ConsumerDestination destination, final String group,
final ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
@@ -669,7 +689,7 @@ public class KafkaMessageChannelBinder extends
new ExtendedProducerProperties<>(dlqProducerProperties));
final KafkaTemplate<?,?> kafkaTemplate = new KafkaTemplate<>(producerFactory);
@SuppressWarnings({"unchecked", "rawtypes"})
@SuppressWarnings("rawtypes")
DlqSender<?,?> dlqSender = new DlqSender(kafkaTemplate);
return message -> {

View File

@@ -69,6 +69,7 @@ public class KafkaBinderConfiguration {
@Autowired
private KafkaExtendedBindingProperties kafkaExtendedBindingProperties;
@SuppressWarnings("rawtypes")
@Autowired
private ProducerListener producerListener;
@@ -85,6 +86,7 @@ public class KafkaBinderConfiguration {
return new KafkaTopicProvisioner(configurationProperties, this.kafkaProperties);
}
@SuppressWarnings("unchecked")
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider, @Nullable ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> listenerContainerCustomizer) {
@@ -96,6 +98,7 @@ public class KafkaBinderConfiguration {
return kafkaMessageChannelBinder;
}
@SuppressWarnings("rawtypes")
@Bean
@ConditionalOnMissingBean(ProducerListener.class)
ProducerListener producerListener() {
@@ -154,6 +157,7 @@ public class KafkaBinderConfiguration {
}
@SuppressWarnings("unused")
public static class JaasConfigurationProperties {
private JaasLoginModuleConfiguration kafka;

View File

@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
@@ -75,6 +76,7 @@ import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.PartitionCapableBinderTests;
import org.springframework.cloud.stream.binder.PartitionTestSupport;
import org.springframework.cloud.stream.binder.PollableSource;
import org.springframework.cloud.stream.binder.RequeueCurrentMessageException;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.TestUtils;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
@@ -2533,6 +2535,38 @@ public class KafkaBinderTests extends
binding.unbind();
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testPolledConsumerRequeue() throws Exception {
KafkaTestBinder binder = getBinder();
PollableSource<MessageHandler> inboundBindTarget = new DefaultPollableMessageSource(this.messageConverter);
ExtendedConsumerProperties<KafkaConsumerProperties> properties = createConsumerProperties();
Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer("pollableRequeue", "group",
inboundBindTarget, properties);
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
KafkaTemplate template = new KafkaTemplate(new DefaultKafkaProducerFactory<>(producerProps));
template.send("pollableRequeue", "testPollable");
try {
boolean polled = false;
int n = 0;
while (n++ < 100 && !polled) {
polled = inboundBindTarget.poll(m -> {
assertThat(m.getPayload()).isEqualTo("testPollable".getBytes());
throw new RequeueCurrentMessageException();
});
}
fail("Expected exception");
}
catch (MessageHandlingException e) {
assertThat(e.getCause()).isInstanceOf(RequeueCurrentMessageException.class);
}
boolean polled = inboundBindTarget.poll(m -> {
assertThat(m.getPayload()).isEqualTo("testPollable".getBytes());
});
assertThat(polled).isTrue();
binding.unbind();
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testPolledConsumerWithDlq() throws Exception {

View File

@@ -33,11 +33,13 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerPro
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.retry.support.RetryTemplate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.willReturn;
@@ -63,6 +65,7 @@ public class KafkaTransactionTests {
KafkaBinderConfigurationProperties configurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
configurationProperties.getTransaction().setTransactionIdPrefix("foo-");
configurationProperties.getTransaction().getProducer().setUseNativeEncoding(true);
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(configurationProperties, kafkaProperties);
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
final Producer mockProducer = mock(Producer.class);
@@ -93,6 +96,8 @@ public class KafkaTransactionTests {
inOrder.verify(mockProducer).commitTransaction();
inOrder.verify(mockProducer).close();
inOrder.verifyNoMoreInteractions();
assertThat(TestUtils.getPropertyValue(channel, "dispatcher.theOneHandler.useNativeEncoding", Boolean.class))
.isTrue();
}
}

View File

@@ -0,0 +1,155 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.integration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE,
properties = {"spring.cloud.stream.kafka.bindings.output.producer.configuration.key.serializer=FooSerializer.class",
"spring.cloud.stream.kafka.default.producer.configuration.key.serializer=BarSerializer.class",
"spring.cloud.stream.kafka.default.producer.configuration.value.serializer=BarSerializer.class",
"spring.cloud.stream.kafka.bindings.input.consumer.configuration.key.serializer=FooSerializer.class",
"spring.cloud.stream.kafka.default.consumer.configuration.key.serializer=BarSerializer.class",
"spring.cloud.stream.kafka.default.consumer.configuration.value.serializer=BarSerializer.class",
"spring.cloud.stream.kafka.default.producer.configuration.foo=bar",
"spring.cloud.stream.kafka.bindings.output.producer.configuration.foo=bindingSpecificPropertyShouldWinOverDefault"})
public class KafkaBinderExtendedPropertiesTest {
private static final String KAFKA_BROKERS_PROPERTY = "spring.cloud.stream.kafka.binder.brokers";
private static final String ZK_NODE_PROPERTY = "spring.cloud.stream.kafka.binder.zkNodes";
@ClassRule
public static EmbeddedKafkaRule kafkaEmbedded = new EmbeddedKafkaRule(1, true);
@BeforeClass
public static void setup() {
System.setProperty(KAFKA_BROKERS_PROPERTY, kafkaEmbedded.getEmbeddedKafka().getBrokersAsString());
System.setProperty(ZK_NODE_PROPERTY, kafkaEmbedded.getEmbeddedKafka().getZookeeperConnectionString());
}
@AfterClass
public static void clean() {
System.clearProperty(KAFKA_BROKERS_PROPERTY);
System.clearProperty(ZK_NODE_PROPERTY);
}
@Autowired
private ConfigurableApplicationContext context;
@Test
public void testKafkaBinderExtendedProperties() {
BinderFactory binderFactory = context.getBeanFactory().getBean(BinderFactory.class);
Binder<MessageChannel, ? extends ConsumerProperties, ? extends ProducerProperties> kafkaBinder =
binderFactory.getBinder("kafka", MessageChannel.class);
KafkaProducerProperties kafkaProducerProperties =
(KafkaProducerProperties)((ExtendedPropertiesBinder) kafkaBinder).getExtendedProducerProperties("output");
//binding "output" gets FooSerializer defined on the binding itself and BarSerializer through default property.
assertThat(kafkaProducerProperties.getConfiguration().get("key.serializer")).isEqualTo("FooSerializer.class");
assertThat(kafkaProducerProperties.getConfiguration().get("value.serializer")).isEqualTo("BarSerializer.class");
assertThat(kafkaProducerProperties.getConfiguration().get("foo")).isEqualTo("bindingSpecificPropertyShouldWinOverDefault");
KafkaConsumerProperties kafkaConsumerProperties =
(KafkaConsumerProperties)((ExtendedPropertiesBinder) kafkaBinder).getExtendedConsumerProperties("input");
//binding "input" gets FooSerializer defined on the binding itself and BarSerializer through default property.
assertThat(kafkaConsumerProperties.getConfiguration().get("key.serializer")).isEqualTo("FooSerializer.class");
assertThat(kafkaConsumerProperties.getConfiguration().get("value.serializer")).isEqualTo("BarSerializer.class");
KafkaProducerProperties customKafkaProducerProperties =
(KafkaProducerProperties)((ExtendedPropertiesBinder) kafkaBinder).getExtendedProducerProperties("custom-out");
//binding "output" gets BarSerializer and BarSerializer for ker.serializer/value.serializer through default properties.
assertThat(customKafkaProducerProperties.getConfiguration().get("key.serializer")).isEqualTo("BarSerializer.class");
assertThat(customKafkaProducerProperties.getConfiguration().get("value.serializer")).isEqualTo("BarSerializer.class");
//through default properties.
assertThat(customKafkaProducerProperties.getConfiguration().get("foo")).isEqualTo("bar");
KafkaConsumerProperties customKafkaConsumerProperties =
(KafkaConsumerProperties)((ExtendedPropertiesBinder) kafkaBinder).getExtendedConsumerProperties("custom-in");
//binding "input" gets BarSerializer and BarSerializer for ker.serializer/value.serializer through default properties.
assertThat(customKafkaConsumerProperties.getConfiguration().get("key.serializer")).isEqualTo("BarSerializer.class");
assertThat(customKafkaConsumerProperties.getConfiguration().get("value.serializer")).isEqualTo("BarSerializer.class");
}
@EnableBinding(CustomBindingForExtendedPropertyTesting.class)
@EnableAutoConfiguration
public static class KafkaMetricsTestConfig {
@StreamListener(Sink.INPUT)
@SendTo(Processor.OUTPUT)
public String process(String payload) throws InterruptedException {
return payload;
}
@StreamListener("custom-in")
@SendTo("custom-out")
public String processCustom(String payload) throws InterruptedException {
return payload;
}
}
interface CustomBindingForExtendedPropertyTesting extends Processor{
@Output("custom-out")
MessageChannel customOut();
@Input("custom-in")
SubscribableChannel customIn();
}
}