Compare commits

..

1 Commits

Author SHA1 Message Date
buildmaster
c725fd1ba4 Update SNAPSHOT to 2.2.0.M1 2019-03-25 16:04:30 +00:00
33 changed files with 103 additions and 930 deletions

View File

@@ -132,7 +132,7 @@ If set to `true`, the binder creates new topics automatically.
If set to `false`, the binder relies on the topics being already configured.
In the latter case, if the topics do not exist, the binder fails to start.
+
NOTE: This setting is independent of the `auto.create.topics.enable` setting of the broker and does not influence it.
NOTE: This setting is independent of the `auto.topic.create.enable` setting of the broker and does not influence it.
If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.
+
Default: `true`.
@@ -162,9 +162,6 @@ Default: none.
[[kafka-consumer-properties]]
==== Kafka Consumer Properties
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
The following properties are available for Kafka consumers only and
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
@@ -287,9 +284,6 @@ Default: none (the binder-wide default of 1 is used).
[[kafka-producer-properties]]
==== Kafka Producer Properties
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
The following properties are available for Kafka producers only and
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.producer.`.

View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>2.2.0.M1</version>
</parent>
<packaging>pom</packaging>
<name>spring-cloud-stream-binder-kafka-docs</name>
@@ -124,10 +124,10 @@
<sourceDocumentName>${docs.main}.adoc</sourceDocumentName>
<attributes>
<spring-cloud-stream-version>${project.version}</spring-cloud-stream-version>
<docs-url>https://cloud.spring.io/</docs-url>
<docs-version></docs-version>
<!-- <docs-version>${project.version}/</docs-version> -->
<!-- <docs-url>https://cloud.spring.io/spring-cloud-static/</docs-url> -->
<!-- <docs-url>https://cloud.spring.io/</docs-url> -->
<!-- <docs-version></docs-version> -->
<docs-version>${project.version}/</docs-version>
<docs-url>https://cloud.spring.io/spring-cloud-static/</docs-url>
</attributes>
</configuration>
<executions>

View File

@@ -44,7 +44,7 @@ time window, and the computed results are sent to a downstream topic (e.g., `cou
[source]
----
@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {
@StreamListener("input")
@@ -722,241 +722,3 @@ For more details about the health information, see the
https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-endpoints.html#production-ready-health[Spring Boot Actuator documentation].
NOTE: The status of the health indicator is `UP` if all the Kafka threads registered are in the `RUNNING` state.
=== Functional Kafka Streams Applications
Starting 2.2.0.RELEASE, Kafka Streams binder supports the ability to write Kafka Streams applications by simply just implementing the java.util.function.Function or java.util.consumer.Consumer interfaces in Java.
In this section, we will see the details of how the functional support work in the binder.
The above `StreamListener` based model can be converted as below.
[source]
----
@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class WordCountProcessorApplication {
@Bean
public Function<KStream<?, String>, KStream<?, WordCount>> process() {
return input ->
input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-multi"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
----
The input will be received from the input binding defined in the `KafkaStreamsProcessor` interface and the output will be sent to the output binding.
In this case, the input is a stream of `String` objects and the output is a stream of `WordCount` objects.
If the processor does not send any data on the outbound, then this becomes a plain Consumer bean as below.
In this example, we are simply receiving some data as a stream of `String` objects (`KStream<?, String>`) and possibly doing terminal operations with that data without sending any outputs.
[source]
----
@Bean
public Consumer<KStream<?, String>> process() {
return input ->
....
}
----
Applications are free to define custom bindings and use that instead of the out of the box `KafkaStreamsProcessor` interface.
==== Functions with multiple input bindings
With `StreamListener`, we define multiple `Input` bindings and then later on use them as inputs in the method.
With the functions approach, we still need to define those bindings in the binding interface. However, it cannot be used in the same way as in a `StreamListener` method.
We use curried functions to represent multiple input destinations in the same processor.
For instance, if a function has 2 inputs, the application define 2 partial functions in the function bean method. Lets see some examples.
[source]
----
@Bean
public Function<KStream<String, Long>,
Function<KTable<String, String>, KStream<String, Long>>> process() {
return userClicksStream ->
(userRegionsTable ->
(userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
.reduce((firstClicks, secondClicks) -> firstClicks + secondClicks)
.toStream()));
}
----
In the above function bean, there are two inputs and one output.
The function that returns from the method takes a `KStream` as input, but if you look at the output of this function,, that is another function
which takes a `KTable` as its input. The output of this second function is a `KStream` which becomes the output of the processor.
Another way to look at this is like the following:
Function 1: Function<KStream<String, Long> - KStream input; returns the output of "Function 2"
Function 2: Function<KTable<String, String>, KStream<String, Long>> - KTable input; returns KStream<String, Long> which becomes the output of the processor.
Both inputs are available as references in the method body and the applications can perform various operations on them.
In this example we use function currying on two partial functions.
One thing to keep in mind is that the input bindings must follow a natural order of sorting when you have multiple input bindings, otherwise the binder won't know which binding to bind for the various function inputs.
Here is the corresponding binding interface for the above processor.
[source]
----
interface KStreamKTableProcessor {
@Input("input-1")
KStream<?, ?> input1();
@Input("input-2")
KTable<?, ?> input2();
@Output("output")
KStream<?, ?> output();
}
----
If you look at the 2 inputs, there is a natural sorting order - i.e. input-1 goes to the first partial function input and input-2 goes to the second partial function.
Here is another example that shows multiple inputs with GlobalKTable.
[source]
----
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> process() {
return orderStream -> (
customers -> (
products -> (
orderStream.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
----
Here we have 3 inputs. The first function takes a `KStream` and its output is another `Function` that takes a `GlobalKTable` as its input and another function as its output.
This last function takes another `GlobalKTable` as its input and a `KStream` is provided as this function's output which will be used as the processor's output.
Here is a sequential way to conceptualize this:
Function 1: Function<KStream<Long, Order> - KStream input; returns the output of "Function 2"
Function 2: Function<GlobalKTable<Long, Customer>, KStream<String, Long>> - GlobalKTable input; returns the output of "Function 3"
Function 3: Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>> - GlobalKTable input; returns KStream which becomes the output of the processor.
In this example, we have three curried functions. Behind the scenes, the binder will call the `apply` method on those functions in the order that they appear.
Here is the corresponding binding interface for this application.
[source]
----
interface CustomGlobalKTableProcessor {
@Input("input-1")
KStream<?, ?> input1();
@Input("input-2")
GlobalKTable<?, ?> input2();
@Input("input-3")
GlobalKTable<?, ?> input3();
@Output("output")
KStream<?, ?> output();
}
----
Here also, the input bindings follow a natural order.
==== Multiple functions in the same application
Multiple functions aan be defined in the same application.
When doing this, the binder will do a natural sorting on multiple function bean names first and then apply input and output bindings on them in the natural order.
Consider the following two function beans in the same application.
[source]
----
@Bean
public Function<KStream<?, String>, KStream<?, WordCount>> process1() {
}
@Bean
public Function<KStream<?, String>, KStream<?, WordCount>> process2() {
}
----
Consider also the following binding interface.
[source]
----
interface Bindings {
@Input("input-1")
KStream<?, ?> input1();
@Input("input-2")
KStream<?, ?> input2();
@Ouput("output-1")
KStream<?, ?> output1();
@Output("output-2")
KStream<?, ?> output1();
}
----
Binder will first take the method `process1` and use input binding `input-1` and output binding `output-1`.
Similarly, for the method `process2`, it will use input binding `input-2` and output binding `output-2`.
==== Using custom state stores in functional applications
You can define custom state stores as beans in your application and those will be detected and added to the Kafka Streams builder by the binder.
Note that, for regular StreamListener based processors, you still need to use the `KafkaStreamsStateStore` annotation for custom state stores.
Here is an example of using custom state stores with functional style described in this section.
[source]
----
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
----
These state stores can be then accessed by the applications directly.

View File

@@ -112,7 +112,7 @@ If set to `true`, the binder creates new topics automatically.
If set to `false`, the binder relies on the topics being already configured.
In the latter case, if the topics do not exist, the binder fails to start.
+
NOTE: This setting is independent of the `auto.create.topics.enable` setting of the broker and does not influence it.
NOTE: This setting is independent of the `auto.topic.create.enable` setting of the broker and does not influence it.
If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.
+
Default: `true`.
@@ -142,9 +142,6 @@ Default: none.
[[kafka-consumer-properties]]
==== Kafka Consumer Properties
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
The following properties are available for Kafka consumers only and
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
@@ -267,9 +264,6 @@ Default: none (the binder-wide default of 1 is used).
[[kafka-producer-properties]]
==== Kafka Producer Properties
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
The following properties are available for Kafka producers only and
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.producer.`.

View File

@@ -2,7 +2,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>2.2.0.M1</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
@@ -15,7 +15,7 @@
<spring-kafka.version>2.2.2.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>3.1.0.RELEASE</spring-integration-kafka.version>
<kafka.version>2.0.0</kafka.version>
<spring-cloud-stream.version>2.2.0.RELEASE</spring-cloud-stream.version>
<spring-cloud-stream.version>2.2.0.M1</spring-cloud-stream.version>
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>2.2.0.M1</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>2.2.0.M1</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -25,8 +25,6 @@ import javax.validation.constraints.AssertTrue;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -58,8 +56,6 @@ public class KafkaBinderConfigurationProperties {
private static final String DEFAULT_KAFKA_CONNECTION_STRING = "localhost:9092";
private final Log logger = LogFactory.getLog(getClass());
private final Transaction transaction = new Transaction();
private final KafkaProperties kafkaProperties;
@@ -533,7 +529,6 @@ public class KafkaBinderConfigurationProperties {
}
}
consumerConfiguration.putAll(this.consumerProperties);
filterStreamManagedConfiguration(consumerConfiguration);
// Override Spring Boot bootstrap server setting if left to default with the value
// configured in the binder
return getConfigurationWithBootstrapServer(consumerConfiguration,
@@ -564,25 +559,6 @@ public class KafkaBinderConfigurationProperties {
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
}
private void filterStreamManagedConfiguration(Map<String, Object> configuration) {
if (configuration.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
&& configuration.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals(true)) {
logger.warn(constructIgnoredConfigMessage(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) +
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + "=true is not supported by the Kafka binder");
configuration.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
}
if (configuration.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
logger.warn(constructIgnoredConfigMessage(ConsumerConfig.GROUP_ID_CONFIG) +
"Use spring.cloud.stream.default.group or spring.cloud.stream.binding.<name>.group to specify " +
"the group instead of " + ConsumerConfig.GROUP_ID_CONFIG);
configuration.remove(ConsumerConfig.GROUP_ID_CONFIG);
}
}
private String constructIgnoredConfigMessage(String config) {
return String.format("Ignoring provided value(s) for '%s'. ", config);
}
private Map<String, Object> getConfigurationWithBootstrapServer(
Map<String, Object> configuration, String bootstrapServersConfig) {
if (ObjectUtils.isEmpty(configuration.get(bootstrapServersConfig))) {

View File

@@ -1,108 +0,0 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.properties;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import static org.assertj.core.api.Assertions.assertThat;
public class KafkaBinderConfigurationPropertiesTest {
@Test
public void mergedConsumerConfigurationFiltersGroupIdFromKafkaProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
kafkaProperties.getConsumer().setGroupId("group1");
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
Map<String, Object> mergedConsumerConfiguration =
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
public void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
kafkaProperties.getConsumer().setEnableAutoCommit(true);
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
Map<String, Object> mergedConsumerConfiguration =
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
}
@Test
public void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConfiguration() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
kafkaBinderConfigurationProperties
.setConfiguration(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
public void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConfiguration() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
kafkaBinderConfigurationProperties
.setConfiguration(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
public void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConsumerProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
kafkaBinderConfigurationProperties
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
public void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConsumerProps() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
kafkaBinderConfigurationProperties
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
}

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>2.2.0.M1</version>
</parent>
<properties>

View File

@@ -27,12 +27,10 @@ import org.springframework.context.annotation.Configuration;
/**
* Application support configuration for Kafka Streams binder.
*
* @deprecated Features provided on this class can be directly configured in the application itself using Kafka Streams.
* @author Soby Chacko
*/
@Configuration
@EnableConfigurationProperties(KafkaStreamsApplicationSupportProperties.class)
@Deprecated
public class KafkaStreamsApplicationSupportAutoConfiguration {
@Bean

View File

@@ -31,13 +31,12 @@ import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.stream.binder.BinderConfiguration;
import org.springframework.cloud.stream.binder.kafka.streams.function.FunctionDetectorCondition;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.serde.CompositeNonNativeSerde;
@@ -49,7 +48,6 @@ import org.springframework.cloud.stream.config.BindingServiceConfiguration;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import org.springframework.core.env.MapPropertySource;
@@ -242,7 +240,8 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
}
@Bean
@Conditional(FunctionDetectorCondition.class)
// @ConditionalOnProperty("spring.cloud.stream.kafka.streams.function.definition")
@ConditionalOnProperty("spring.cloud.stream.function.definition")
public KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver,
@@ -303,7 +302,6 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
@Bean
@SuppressWarnings("unchecked")
@ConditionalOnMissingBean
public KeyValueSerdeResolver keyValueSerdeResolver(
@Qualifier("streamConfigGlobalProperties") Object streamConfigGlobalProperties,
KafkaStreamsBinderConfigurationProperties properties) {

View File

@@ -40,7 +40,6 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
@@ -49,7 +48,6 @@ import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.core.FluxedConsumer;
import org.springframework.cloud.function.core.FluxedFunction;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
@@ -69,12 +67,10 @@ import org.springframework.kafka.core.CleanupConfig;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
/**
* @author Soby Chacko
* @since 2.2.0
*/
public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
@@ -92,8 +88,6 @@ public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
private ConfigurableApplicationContext applicationContext;
private Set<String> origInputs = new TreeSet<>();
private Set<String> origOutputs = new TreeSet<>();
public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
@@ -111,61 +105,50 @@ public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
this.cleanupConfig = cleanupConfig;
this.functionCatalog = functionCatalog;
this.bindableProxyFactory = bindableProxyFactory;
this.origInputs.addAll(this.bindableProxyFactory.getInputs());
this.origOutputs.addAll(this.bindableProxyFactory.getOutputs());
}
private Map<String, ResolvableType> buildTypeMap(ResolvableType resolvableType) {
int inputCount = 1;
final Set<String> inputs = new TreeSet<>(this.bindableProxyFactory.getInputs());
ResolvableType resolvableTypeGeneric = resolvableType.getGeneric(1);
while (resolvableTypeGeneric != null && resolvableTypeGeneric.getRawClass() != null && (resolvableTypeGeneric.getRawClass().equals(Function.class) ||
resolvableTypeGeneric.getRawClass().equals(Consumer.class))) {
inputCount++;
resolvableTypeGeneric = resolvableTypeGeneric.getGeneric(1);
}
final Set<String> inputs = new TreeSet<>(origInputs);
Map<String, ResolvableType> resolvableTypeMap = new LinkedHashMap<>();
Map<String, ResolvableType> map = new LinkedHashMap<>();
final Iterator<String> iterator = inputs.iterator();
final String next = iterator.next();
resolvableTypeMap.put(next, resolvableType.getGeneric(0));
origInputs.remove(next);
if (iterator.hasNext()) {
map.put(iterator.next(), resolvableType.getGeneric(0));
ResolvableType generic = resolvableType.getGeneric(1);
for (int i = 1; i < inputCount; i++) {
if (iterator.hasNext()) {
ResolvableType generic = resolvableType.getGeneric(1);
while (iterator.hasNext() && generic != null) {
if (generic.getRawClass() != null &&
(generic.getRawClass().equals(Function.class) ||
generic.getRawClass().equals(Consumer.class))) {
final String next1 = iterator.next();
resolvableTypeMap.put(next1, generic.getGeneric(0));
origInputs.remove(next1);
map.put(iterator.next(), generic.getGeneric(0));
}
generic = generic.getGeneric(1);
}
}
return resolvableTypeMap;
return map;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public void orchestrateFunctionInvoking(ResolvableType resolvableType, String functionName) {
@SuppressWarnings("unchecked")
public void orchestrateStreamListenerSetupMethod(ResolvableType resolvableType, String functionName) {
final Set<String> outputs = new TreeSet<>(this.bindableProxyFactory.getOutputs());
String[] methodAnnotatedOutboundNames = new String[outputs.size()];
int j = 0;
for (String output : outputs) {
methodAnnotatedOutboundNames[j++] = output;
}
final Map<String, ResolvableType> stringResolvableTypeMap = buildTypeMap(resolvableType);
Object[] adaptedInboundArguments = adaptAndRetrieveInboundArguments(stringResolvableTypeMap, functionName);
Object[] adaptedInboundArguments = adaptAndRetrieveInboundArguments(stringResolvableTypeMap, "foobar");
try {
if (resolvableType.getRawClass() != null && resolvableType.getRawClass().equals(Consumer.class)) {
FluxedConsumer fluxedConsumer = functionCatalog.lookup(FluxedConsumer.class, functionName);
Assert.isTrue(fluxedConsumer != null,
"No corresponding consumer beans found in the catalog");
Object target = fluxedConsumer.getTarget();
Consumer<Object> consumer = Consumer.class.isAssignableFrom(target.getClass()) ? (Consumer) target : null;
if (consumer != null) {
consumer.accept(adaptedInboundArguments[0]);
}
Consumer<Object> consumer = functionCatalog.lookup(Consumer.class, functionName);
consumer.accept(adaptedInboundArguments[0]);
}
else {
Function<Object, Object> function = functionCatalog.lookup(Function.class, functionName);
Object target = null;
if (function instanceof FluxedFunction) {
@@ -185,20 +168,15 @@ public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
i++;
}
if (result != null) {
final Set<String> outputs = new TreeSet<>(origOutputs);
final Iterator<String> iterator = outputs.iterator();
if (result.getClass().isArray()) {
final int length = ((Object[]) result).length;
String[] methodAnnotatedOutboundNames = new String[length];
for (int j = 0; j < length; j++) {
if (iterator.hasNext()) {
final String next = iterator.next();
methodAnnotatedOutboundNames[j] = next;
this.origOutputs.remove(next);
}
}
Assert.isTrue(methodAnnotatedOutboundNames.length == ((Object[]) result).length,
"Result does not match with the number of declared outbounds");
}
else {
Assert.isTrue(methodAnnotatedOutboundNames.length == 1,
"Result does not match with the number of declared outbounds");
}
if (result.getClass().isArray()) {
Object[] outboundKStreams = (Object[]) result;
int k = 0;
for (Object outboundKStream : outboundKStreams) {
@@ -210,15 +188,11 @@ public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
}
}
else {
if (iterator.hasNext()) {
final String next = iterator.next();
Object targetBean = this.applicationContext.getBean(next);
this.origOutputs.remove(next);
Object targetBean = this.applicationContext.getBean(methodAnnotatedOutboundNames[0]);
KStreamBoundElementFactory.KStreamWrapper
boundElement = (KStreamBoundElementFactory.KStreamWrapper) targetBean;
boundElement.wrap((KStream) result);
}
KStreamBoundElementFactory.KStreamWrapper
boundElement = (KStreamBoundElementFactory.KStreamWrapper) targetBean;
boundElement.wrap((KStream) result);
}
}
}
@@ -253,6 +227,7 @@ public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
KafkaStreamsConsumerProperties extendedConsumerProperties =
this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(input);
//get state store spec
//KafkaStreamsStateStoreProperties spec = buildStateStoreSpec(method);
Serde<?> keySerde = this.keyValueSerdeResolver.getInboundKeySerde(extendedConsumerProperties);
Serde<?> valueSerde = this.keyValueSerdeResolver.getInboundValueSerde(
bindingProperties.getConsumer(), extendedConsumerProperties);
@@ -389,21 +364,6 @@ public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
BindingProperties bindingProperties,
StreamsBuilder streamsBuilder,
Serde<?> keySerde, Serde<?> valueSerde, Topology.AutoOffsetReset autoOffsetReset) {
try {
final Map<String, StoreBuilder> storeBuilders = applicationContext.getBeansOfType(StoreBuilder.class);
if (!CollectionUtils.isEmpty(storeBuilders)) {
storeBuilders.values().forEach(storeBuilder -> {
streamsBuilder.addStateStore(storeBuilder);
if (LOG.isInfoEnabled()) {
LOG.info("state store " + storeBuilder.name() + " added to topology");
}
});
}
}
catch (Exception e) {
// Pass through.
}
String[] bindingTargets = StringUtils
.commaDelimitedListToStringArray(this.bindingServiceProperties.getBindingDestination(inboundName));

View File

@@ -19,12 +19,10 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.HashMap;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.Processor;
@@ -79,15 +77,14 @@ public class KafkaStreamsMessageConversionDelegate {
* @param outboundBindTarget outbound KStream target
* @return serialized KStream
*/
@SuppressWarnings({"rawtypes", "unchecked"})
@SuppressWarnings("rawtypes")
public KStream serializeOnOutbound(KStream<?, ?> outboundBindTarget) {
String contentType = this.kstreamBindingInformationCatalogue
.getContentType(outboundBindTarget);
MessageConverter messageConverter = this.compositeMessageConverterFactory
.getMessageConverterForAllRegistered();
final PerRecordContentTypeHolder perRecordContentTypeHolder = new PerRecordContentTypeHolder();
final KStream<?, ?> kStreamWithEnrichedHeaders = outboundBindTarget.mapValues((v) -> {
return outboundBindTarget.mapValues((v) -> {
Message<?> message = v instanceof Message<?> ? (Message<?>) v
: MessageBuilder.withPayload(v).build();
Map<String, Object> headers = new HashMap<>(message.getHeaders());
@@ -95,46 +92,9 @@ public class KafkaStreamsMessageConversionDelegate {
headers.put(MessageHeaders.CONTENT_TYPE, contentType);
}
MessageHeaders messageHeaders = new MessageHeaders(headers);
final Message<?> convertedMessage = messageConverter.toMessage(message.getPayload(), messageHeaders);
perRecordContentTypeHolder.setContentType((String) messageHeaders.get(MessageHeaders.CONTENT_TYPE));
return convertedMessage.getPayload();
return messageConverter.toMessage(message.getPayload(), messageHeaders)
.getPayload();
});
kStreamWithEnrichedHeaders.process(() -> new Processor() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, Object value) {
if (perRecordContentTypeHolder.contentType != null) {
this.context.headers().remove(MessageHeaders.CONTENT_TYPE);
final Header header;
try {
header = new RecordHeader(MessageHeaders.CONTENT_TYPE,
new ObjectMapper().writeValueAsBytes(perRecordContentTypeHolder.contentType));
this.context.headers().add(header);
}
catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Could not add content type header");
}
}
perRecordContentTypeHolder.unsetContentType();
}
}
@Override
public void close() {
}
});
return kStreamWithEnrichedHeaders;
}
/**
@@ -323,10 +283,6 @@ public class KafkaStreamsMessageConversionDelegate {
this.contentType = contentType;
}
void unsetContentType() {
this.contentType = null;
}
}
}

View File

@@ -569,11 +569,10 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator
(Class<StreamsBuilderFactoryBean>) streamsBuilder.getClass(),
() -> streamsBuilder)
.getRawBeanDefinition();
final String beanNamePostFix = method.getDeclaringClass().getSimpleName() + "-" + method.getName();
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(
"stream-builder-" + beanNamePostFix, streamsBuilderBeanDefinition);
"stream-builder-" + method.getName(), streamsBuilderBeanDefinition);
StreamsBuilderFactoryBean streamsBuilderX = applicationContext.getBean(
"&stream-builder-" + beanNamePostFix, StreamsBuilderFactoryBean.class);
"&stream-builder-" + method.getName(), StreamsBuilderFactoryBean.class);
this.methodStreamsBuilderFactoryBeanMap.put(method, streamsBuilderX);
}

View File

@@ -53,7 +53,7 @@ import org.springframework.util.StringUtils;
* @author Soby Chacko
* @author Lei Chen
*/
public class KeyValueSerdeResolver {
class KeyValueSerdeResolver {
private final Map<String, Object> streamConfigGlobalProperties;

View File

@@ -1,87 +0,0 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.function;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.ResolvableType;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.util.ClassUtils;
/**
* Custom {@link org.springframework.context.annotation.Condition} that detects the presence
* of java.util.Function|Consumer beans. Used for Kafka Streams function support.
*
* @author Soby Chakco
* @since 2.2.0
*/
public class FunctionDetectorCondition extends SpringBootCondition {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) {
if (context != null && context.getBeanFactory() != null) {
Map functionTypes = context.getBeanFactory().getBeansOfType(Function.class);
functionTypes.putAll(context.getBeanFactory().getBeansOfType(Consumer.class));
final Map<String, Object> kstreamFunctions = pruneFunctionBeansForKafkaStreams(functionTypes, context);
if (!kstreamFunctions.isEmpty()) {
return ConditionOutcome.match("Matched. Function/Consumer beans found");
}
else {
return ConditionOutcome.noMatch("No match. No Function/Consumer beans found");
}
}
return ConditionOutcome.noMatch("No match. No Function/Consumer beans found");
}
private static <T> Map<String, T> pruneFunctionBeansForKafkaStreams(Map<String, T> originalFunctionBeans,
ConditionContext context) {
final Map<String, T> prunedMap = new HashMap<>();
for (String key : originalFunctionBeans.keySet()) {
final Class<?> classObj = ClassUtils.resolveClassName(((AnnotatedBeanDefinition)
context.getBeanFactory().getBeanDefinition(key))
.getMetadata().getClassName(),
ClassUtils.getDefaultClassLoader());
try {
Method method = classObj.getMethod(key);
ResolvableType resolvableType = ResolvableType.forMethodReturnType(method, classObj);
final Class<?> rawClass = resolvableType.getGeneric(0).getRawClass();
if (rawClass == KStream.class || rawClass == KTable.class || rawClass == GlobalKTable.class) {
prunedMap.put(key, originalFunctionBeans.get(key));
}
}
catch (NoSuchMethodException e) {
//ignore
}
}
return prunedMap;
}
}

View File

@@ -16,32 +16,34 @@
package org.springframework.cloud.stream.binder.kafka.streams.function;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsFunctionProcessor;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
/**
* @author Soby Chacko
* @since 2.2.0
*/
@Configuration
@ConditionalOnProperty("spring.cloud.stream.function.definition")
@EnableConfigurationProperties(StreamFunctionProperties.class)
public class KafkaStreamsFunctionAutoConfiguration {
@Bean
@Conditional(FunctionDetectorCondition.class)
public KafkaStreamsFunctionProcessorInvoker kafkaStreamsFunctionProcessorInvoker(
KafkaStreamsFunctionBeanPostProcessor kafkaStreamsFunctionBeanPostProcessor,
KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor) {
return new KafkaStreamsFunctionProcessorInvoker(kafkaStreamsFunctionBeanPostProcessor.getResolvableTypes(),
kafkaStreamsFunctionProcessor);
KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor,
StreamFunctionProperties properties) {
return new KafkaStreamsFunctionProcessorInvoker(kafkaStreamsFunctionBeanPostProcessor.getResolvableType(),
properties.getDefinition(), kafkaStreamsFunctionProcessor);
}
@Bean
public KafkaStreamsFunctionBeanPostProcessor kafkaStreamsFunctionBeanPostProcessor() {
return new KafkaStreamsFunctionBeanPostProcessor();
public KafkaStreamsFunctionBeanPostProcessor kafkaStreamsFunctionBeanPostProcessor(
StreamFunctionProperties properties) {
return new KafkaStreamsFunctionBeanPostProcessor(properties);
}
}

View File

@@ -17,11 +17,6 @@
package org.springframework.cloud.stream.binder.kafka.streams.function;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
@@ -29,42 +24,40 @@ import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.core.ResolvableType;
import org.springframework.util.ClassUtils;
/**
*
* @author Soby Chacko
* @since 2.2.0
* @since 2.1.0
*
*/
class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean, BeanFactoryAware {
private final StreamFunctionProperties kafkaStreamsFunctionProperties;
private ConfigurableListableBeanFactory beanFactory;
private Map<String, ResolvableType> resolvableTypeMap = new TreeMap<>();
private ResolvableType resolvableType;
public Map<String, ResolvableType> getResolvableTypes() {
return this.resolvableTypeMap;
KafkaStreamsFunctionBeanPostProcessor(StreamFunctionProperties properties) {
this.kafkaStreamsFunctionProperties = properties;
}
public ResolvableType getResolvableType() {
return this.resolvableType;
}
@Override
public void afterPropertiesSet() {
String[] functionNames = this.beanFactory.getBeanNamesForType(Function.class);
String[] consumerNames = this.beanFactory.getBeanNamesForType(Consumer.class);
Stream.concat(Stream.of(functionNames), Stream.of(consumerNames)).forEach(this::extractResolvableTypes);
}
private void extractResolvableTypes(String key) {
public void afterPropertiesSet() throws Exception {
final Class<?> classObj = ClassUtils.resolveClassName(((AnnotatedBeanDefinition)
this.beanFactory.getBeanDefinition(key))
this.beanFactory.getBeanDefinition(kafkaStreamsFunctionProperties.getDefinition()))
.getMetadata().getClassName(),
ClassUtils.getDefaultClassLoader());
try {
Method method = classObj.getMethod(key);
ResolvableType resolvableType = ResolvableType.forMethodReturnType(method, classObj);
resolvableTypeMap.put(key, resolvableType);
Method method = classObj.getMethod(this.kafkaStreamsFunctionProperties.getDefinition());
this.resolvableType = ResolvableType.forMethodReturnType(method, classObj);
}
catch (NoSuchMethodException e) {
//ignore

View File

@@ -16,8 +16,6 @@
package org.springframework.cloud.stream.binder.kafka.streams.function;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsFunctionProcessor;
@@ -31,17 +29,18 @@ import org.springframework.core.ResolvableType;
class KafkaStreamsFunctionProcessorInvoker {
private final KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor;
private final Map<String, ResolvableType> resolvableTypeMap;
private final ResolvableType resolvableType;
private final String functionName;
KafkaStreamsFunctionProcessorInvoker(Map<String, ResolvableType> resolvableTypeMap,
KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor) {
KafkaStreamsFunctionProcessorInvoker(ResolvableType resolvableType, String functionName,
KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor) {
this.kafkaStreamsFunctionProcessor = kafkaStreamsFunctionProcessor;
this.resolvableTypeMap = resolvableTypeMap;
this.resolvableType = resolvableType;
this.functionName = functionName;
}
@PostConstruct
void invoke() {
resolvableTypeMap.forEach((key, value) ->
this.kafkaStreamsFunctionProcessor.orchestrateFunctionInvoking(value, key));
this.kafkaStreamsFunctionProcessor.orchestrateStreamListenerSetupMethod(resolvableType, functionName);
}
}

View File

@@ -26,7 +26,6 @@ import org.springframework.cloud.function.context.WrapperDetector;
/**
* @author Soby Chacko
* @since 2.2.0
*/
public class KafkaStreamsFunctionWrapperDetector implements WrapperDetector {
@Override

View File

@@ -25,11 +25,9 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
* stream processing and one can provide window specific properties at runtime and use
* those properties in the applications using this class.
*
* @deprecated The properties exposed by this class can be used directly on Kafka Streams API in the application.
* @author Soby Chacko
*/
@ConfigurationProperties("spring.cloud.stream.kafka.streams")
@Deprecated
public class KafkaStreamsApplicationSupportProperties {
private TimeWindow timeWindow;

View File

@@ -85,6 +85,7 @@ public class KafkaStreamsBinderWordCountBranchesFunctionTests {
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.definition=process",
"--spring.cloud.stream.bindings.input.destination=words",
"--spring.cloud.stream.bindings.output1.destination=counts",
"--spring.cloud.stream.bindings.output1.contentType=application/json",

View File

@@ -85,6 +85,7 @@ public class KafkaStreamsBinderWordCountFunctionTests {
try (ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.definition=process",
"--spring.cloud.stream.bindings.input.destination=words",
"--spring.cloud.stream.bindings.output.destination=counts",
"--spring.cloud.stream.bindings.output.contentType=application/json",
@@ -185,4 +186,5 @@ public class KafkaStreamsBinderWordCountFunctionTests {
new Date(key.window().start()), new Date(key.window().end()))));
}
}
}

View File

@@ -1,158 +0,0 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.function;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
public class KafkaStreamsFunctionStateStoreTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
"counts");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
@Test
public void testKafkaStreamsFuncionWithMultipleStateStores() throws Exception {
SpringApplication app = new SpringApplication(StateStoreTestApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=words",
"--spring.cloud.stream.kafka.streams.default.consumer.application-id=basic-word-count-1",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" +
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
receiveAndValidate(context);
}
}
private void receiveAndValidate(ConfigurableApplicationContext context) throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("words");
template.sendDefault("foobar");
Thread.sleep(2000L);
StateStoreTestApplication processorApplication = context
.getBean(StateStoreTestApplication.class);
KeyValueStore<Long, Long> state1 = processorApplication.state1;
assertThat(processorApplication.processed).isTrue();
assertThat(state1 != null).isTrue();
assertThat(state1.name()).isEqualTo("my-store");
WindowStore<Long, Long> state2 = processorApplication.state2;
assertThat(state2 != null).isTrue();
assertThat(state2.name()).isEqualTo("other-store");
assertThat(state2.persistent()).isTrue();
}
finally {
pf.destroy();
}
}
@EnableBinding(KStreamProcessorX.class)
@EnableAutoConfiguration
static class StateStoreTestApplication {
KeyValueStore<Long, Long> state1;
WindowStore<Long, Long> state2;
boolean processed;
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.process((ProcessorSupplier<Object, String>) () -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
state1 = (KeyValueStore<Long, Long>) context.getStateStore("my-store");
state2 = (WindowStore<Long, Long>) context.getStateStore("other-store");
}
@Override
public void process(Object key, String value) {
processed = true;
}
@Override
public void close() {
if (state1 != null) {
state1.close();
}
if (state2 != null) {
state2.close();
}
}
}, "my-store", "other-store");
}
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
}
interface KStreamProcessorX {
@Input("input")
KStream<?, ?> input();
}
}

View File

@@ -72,6 +72,7 @@ public class StreamToGlobalKTableFunctionTests {
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.definition=process",
"--spring.cloud.stream.bindings.input.destination=orders",
"--spring.cloud.stream.bindings.input-x.destination=customers",
"--spring.cloud.stream.bindings.input-y.destination=products",

View File

@@ -231,6 +231,7 @@ public class StreamToTableJoinFunctionTests {
try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.definition=process1",
"--spring.cloud.stream.bindings.input-1.destination=user-clicks-2",
"--spring.cloud.stream.bindings.input-2.destination=user-regions-2",
"--spring.cloud.stream.bindings.output.destination=output-topic-2",

View File

@@ -155,7 +155,7 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
receiveAndValidate(context);
// Assertions on StreamBuilderFactoryBean
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context
.getBean("&stream-builder-WordCountProcessorApplication-process", StreamsBuilderFactoryBean.class);
.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
ReadOnlyWindowStore<Object, Object> store = kafkaStreams
.store("foo-WordCounts", QueryableStoreTypes.windowStore());

View File

@@ -108,7 +108,7 @@ public class KafkastreamsBinderPojoInputStringOutputIntegrationTests {
receiveAndValidateFoo(context);
// Assertions on StreamBuilderFactoryBean
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context
.getBean("&stream-builder-ProductCountApplication-process", StreamsBuilderFactoryBean.class);
.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
CleanupConfig cleanup = TestUtils.getPropertyValue(streamsBuilderFactoryBean,
"cleanupConfig", CleanupConfig.class);
assertThat(cleanup.cleanupOnStart()).isFalse();

View File

@@ -1,105 +0,0 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.stereotype.Component;
import static org.assertj.core.api.Assertions.assertThat;
public class MultiProcessorsWithSameNameTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
"counts");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
.getEmbeddedKafka();
@Test
public void testBinderStartsSuccessfullyWhenTwoProcessorsWithSameNamesArePresent() {
SpringApplication app = new SpringApplication(
MultiProcessorsWithSameNameTests.WordCountProcessorApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=words",
"--spring.cloud.stream.bindings.input-2.destination=words",
"--spring.cloud.stream.bindings.output.destination=counts",
"--spring.cloud.stream.bindings.output.contentType=application/json",
"--spring.cloud.stream.kafka.streams.bindings.input-1.consumer.application-id=basic-word-count",
"--spring.cloud.stream.kafka.streams.bindings.input-2.consumer.application-id=basic-word-count-1",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes="
+ embeddedKafka.getZookeeperConnectionString())) {
StreamsBuilderFactoryBean streamsBuilderFactoryBean1 = context
.getBean("&stream-builder-Foo-process", StreamsBuilderFactoryBean.class);
assertThat(streamsBuilderFactoryBean1).isNotNull();
StreamsBuilderFactoryBean streamsBuilderFactoryBean2 = context
.getBean("&stream-builder-Bar-process", StreamsBuilderFactoryBean.class);
assertThat(streamsBuilderFactoryBean2).isNotNull();
}
}
@EnableBinding(KafkaStreamsProcessorX.class)
@EnableAutoConfiguration
@EnableConfigurationProperties(KafkaStreamsApplicationSupportProperties.class)
static class WordCountProcessorApplication {
@Component
static class Foo {
@StreamListener
public void process(@Input("input-1") KStream<Object, String> input) {
}
}
//Second class with a stub processor that has the same name as above ("process")
@Component
static class Bar {
@StreamListener
public void process(@Input("input-2") KStream<Object, String> input) {
}
}
}
interface KafkaStreamsProcessorX {
@Input("input-1")
KStream<?, ?> input1();
@Input("input-2")
KStream<?, ?> input2();
}
}

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>2.2.0.M1</version>
</parent>
<dependencies>

View File

@@ -314,9 +314,7 @@ public class KafkaMessageChannelBinder extends
List<PartitionInfo> partitionsFor = producer
.partitionsFor(destination.getName());
producer.close();
if (this.transactionManager == null) {
((DisposableBean) producerFB).destroy();
}
((DisposableBean) producerFB).destroy();
return partitionsFor;
}, destination.getName());
this.topicsInUse.put(destination.getName(),
@@ -535,7 +533,7 @@ public class KafkaMessageChannelBinder extends
messageListenerContainer
.setApplicationEventPublisher(getApplicationContext());
}
messageListenerContainer.setBeanName(destination + ".container");
messageListenerContainer.setBeanName(topics + ".container");
// end of these won't be needed...
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties()

View File

@@ -105,7 +105,7 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
assertThat(consumerConfigs.get("value.deserializer"))
.isEqualTo(LongDeserializer.class);
assertThat(consumerConfigs.get("value.serialized")).isNull();
assertThat(consumerConfigs.get("group.id")).isEqualTo("test");
assertThat(consumerConfigs.get("group.id")).isEqualTo("groupIdFromBootConfig");
assertThat(consumerConfigs.get("auto.offset.reset")).isEqualTo("earliest");
assertThat((((List<String>) consumerConfigs.get("bootstrap.servers"))
.containsAll(bootstrapServers))).isTrue();