Compare commits
1 Commits
v2.2.0.REL
...
v2.2.0.M1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c725fd1ba4 |
@@ -132,7 +132,7 @@ If set to `true`, the binder creates new topics automatically.
|
||||
If set to `false`, the binder relies on the topics being already configured.
|
||||
In the latter case, if the topics do not exist, the binder fails to start.
|
||||
+
|
||||
NOTE: This setting is independent of the `auto.create.topics.enable` setting of the broker and does not influence it.
|
||||
NOTE: This setting is independent of the `auto.topic.create.enable` setting of the broker and does not influence it.
|
||||
If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.
|
||||
+
|
||||
Default: `true`.
|
||||
@@ -162,9 +162,6 @@ Default: none.
|
||||
[[kafka-consumer-properties]]
|
||||
==== Kafka Consumer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka consumers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
|
||||
|
||||
@@ -287,9 +284,6 @@ Default: none (the binder-wide default of 1 is used).
|
||||
[[kafka-producer-properties]]
|
||||
==== Kafka Producer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka producers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.producer.`.
|
||||
|
||||
|
||||
10
docs/pom.xml
10
docs/pom.xml
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>2.2.0.M1</version>
|
||||
</parent>
|
||||
<packaging>pom</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
@@ -124,10 +124,10 @@
|
||||
<sourceDocumentName>${docs.main}.adoc</sourceDocumentName>
|
||||
<attributes>
|
||||
<spring-cloud-stream-version>${project.version}</spring-cloud-stream-version>
|
||||
<docs-url>https://cloud.spring.io/</docs-url>
|
||||
<docs-version></docs-version>
|
||||
<!-- <docs-version>${project.version}/</docs-version> -->
|
||||
<!-- <docs-url>https://cloud.spring.io/spring-cloud-static/</docs-url> -->
|
||||
<!-- <docs-url>https://cloud.spring.io/</docs-url> -->
|
||||
<!-- <docs-version></docs-version> -->
|
||||
<docs-version>${project.version}/</docs-version>
|
||||
<docs-url>https://cloud.spring.io/spring-cloud-static/</docs-url>
|
||||
</attributes>
|
||||
</configuration>
|
||||
<executions>
|
||||
|
||||
@@ -44,7 +44,7 @@ time window, and the computed results are sent to a downstream topic (e.g., `cou
|
||||
[source]
|
||||
----
|
||||
@SpringBootApplication
|
||||
@EnableBinding(KafkaStreamsProcessor.class)
|
||||
@EnableBinding(KStreamProcessor.class)
|
||||
public class WordCountProcessorApplication {
|
||||
|
||||
@StreamListener("input")
|
||||
@@ -722,241 +722,3 @@ For more details about the health information, see the
|
||||
https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-endpoints.html#production-ready-health[Spring Boot Actuator documentation].
|
||||
|
||||
NOTE: The status of the health indicator is `UP` if all the Kafka threads registered are in the `RUNNING` state.
|
||||
|
||||
=== Functional Kafka Streams Applications
|
||||
|
||||
Starting 2.2.0.RELEASE, Kafka Streams binder supports the ability to write Kafka Streams applications by simply just implementing the java.util.function.Function or java.util.consumer.Consumer interfaces in Java.
|
||||
In this section, we will see the details of how the functional support work in the binder.
|
||||
The above `StreamListener` based model can be converted as below.
|
||||
|
||||
[source]
|
||||
----
|
||||
@SpringBootApplication
|
||||
@EnableBinding(KafkaStreamsProcessor.class)
|
||||
public class WordCountProcessorApplication {
|
||||
|
||||
@Bean
|
||||
public Function<KStream<?, String>, KStream<?, WordCount>> process() {
|
||||
return input ->
|
||||
input
|
||||
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
|
||||
.groupBy((key, value) -> value)
|
||||
.windowedBy(TimeWindows.of(5000))
|
||||
.count(Materialized.as("WordCounts-multi"))
|
||||
.toStream()
|
||||
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(WordCountProcessorApplication.class, args);
|
||||
}
|
||||
----
|
||||
|
||||
The input will be received from the input binding defined in the `KafkaStreamsProcessor` interface and the output will be sent to the output binding.
|
||||
In this case, the input is a stream of `String` objects and the output is a stream of `WordCount` objects.
|
||||
|
||||
If the processor does not send any data on the outbound, then this becomes a plain Consumer bean as below.
|
||||
In this example, we are simply receiving some data as a stream of `String` objects (`KStream<?, String>`) and possibly doing terminal operations with that data without sending any outputs.
|
||||
|
||||
[source]
|
||||
----
|
||||
|
||||
@Bean
|
||||
public Consumer<KStream<?, String>> process() {
|
||||
return input ->
|
||||
....
|
||||
}
|
||||
----
|
||||
|
||||
|
||||
Applications are free to define custom bindings and use that instead of the out of the box `KafkaStreamsProcessor` interface.
|
||||
|
||||
==== Functions with multiple input bindings
|
||||
|
||||
With `StreamListener`, we define multiple `Input` bindings and then later on use them as inputs in the method.
|
||||
With the functions approach, we still need to define those bindings in the binding interface. However, it cannot be used in the same way as in a `StreamListener` method.
|
||||
We use curried functions to represent multiple input destinations in the same processor.
|
||||
For instance, if a function has 2 inputs, the application define 2 partial functions in the function bean method. Lets see some examples.
|
||||
|
||||
[source]
|
||||
----
|
||||
@Bean
|
||||
public Function<KStream<String, Long>,
|
||||
Function<KTable<String, String>, KStream<String, Long>>> process() {
|
||||
return userClicksStream ->
|
||||
(userRegionsTable ->
|
||||
(userClicksStream
|
||||
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
|
||||
"UNKNOWN" : region, clicks),
|
||||
Joined.with(Serdes.String(), Serdes.Long(), null))
|
||||
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
|
||||
regionWithClicks.getClicks()))
|
||||
.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
|
||||
.reduce((firstClicks, secondClicks) -> firstClicks + secondClicks)
|
||||
.toStream()));
|
||||
}
|
||||
----
|
||||
|
||||
In the above function bean, there are two inputs and one output.
|
||||
The function that returns from the method takes a `KStream` as input, but if you look at the output of this function,, that is another function
|
||||
which takes a `KTable` as its input. The output of this second function is a `KStream` which becomes the output of the processor.
|
||||
Another way to look at this is like the following:
|
||||
|
||||
Function 1: Function<KStream<String, Long> - KStream input; returns the output of "Function 2"
|
||||
Function 2: Function<KTable<String, String>, KStream<String, Long>> - KTable input; returns KStream<String, Long> which becomes the output of the processor.
|
||||
|
||||
Both inputs are available as references in the method body and the applications can perform various operations on them.
|
||||
In this example we use function currying on two partial functions.
|
||||
One thing to keep in mind is that the input bindings must follow a natural order of sorting when you have multiple input bindings, otherwise the binder won't know which binding to bind for the various function inputs.
|
||||
Here is the corresponding binding interface for the above processor.
|
||||
|
||||
[source]
|
||||
----
|
||||
interface KStreamKTableProcessor {
|
||||
|
||||
@Input("input-1")
|
||||
KStream<?, ?> input1();
|
||||
|
||||
@Input("input-2")
|
||||
KTable<?, ?> input2();
|
||||
|
||||
@Output("output")
|
||||
KStream<?, ?> output();
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
If you look at the 2 inputs, there is a natural sorting order - i.e. input-1 goes to the first partial function input and input-2 goes to the second partial function.
|
||||
|
||||
Here is another example that shows multiple inputs with GlobalKTable.
|
||||
|
||||
[source]
|
||||
----
|
||||
@Bean
|
||||
public Function<KStream<Long, Order>,
|
||||
Function<GlobalKTable<Long, Customer>,
|
||||
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> process() {
|
||||
|
||||
return orderStream -> (
|
||||
customers -> (
|
||||
products -> (
|
||||
orderStream.join(customers,
|
||||
(orderId, order) -> order.getCustomerId(),
|
||||
(order, customer) -> new CustomerOrder(customer, order))
|
||||
.join(products,
|
||||
(orderId, customerOrder) -> customerOrder
|
||||
.productId(),
|
||||
(customerOrder, product) -> {
|
||||
EnrichedOrder enrichedOrder = new EnrichedOrder();
|
||||
enrichedOrder.setProduct(product);
|
||||
enrichedOrder.setCustomer(customerOrder.customer);
|
||||
enrichedOrder.setOrder(customerOrder.order);
|
||||
return enrichedOrder;
|
||||
})
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
----
|
||||
|
||||
Here we have 3 inputs. The first function takes a `KStream` and its output is another `Function` that takes a `GlobalKTable` as its input and another function as its output.
|
||||
This last function takes another `GlobalKTable` as its input and a `KStream` is provided as this function's output which will be used as the processor's output.
|
||||
|
||||
Here is a sequential way to conceptualize this:
|
||||
|
||||
Function 1: Function<KStream<Long, Order> - KStream input; returns the output of "Function 2"
|
||||
Function 2: Function<GlobalKTable<Long, Customer>, KStream<String, Long>> - GlobalKTable input; returns the output of "Function 3"
|
||||
Function 3: Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>> - GlobalKTable input; returns KStream which becomes the output of the processor.
|
||||
|
||||
In this example, we have three curried functions. Behind the scenes, the binder will call the `apply` method on those functions in the order that they appear.
|
||||
|
||||
Here is the corresponding binding interface for this application.
|
||||
|
||||
[source]
|
||||
----
|
||||
interface CustomGlobalKTableProcessor {
|
||||
|
||||
@Input("input-1")
|
||||
KStream<?, ?> input1();
|
||||
|
||||
@Input("input-2")
|
||||
GlobalKTable<?, ?> input2();
|
||||
|
||||
@Input("input-3")
|
||||
GlobalKTable<?, ?> input3();
|
||||
|
||||
@Output("output")
|
||||
KStream<?, ?> output();
|
||||
}
|
||||
----
|
||||
|
||||
Here also, the input bindings follow a natural order.
|
||||
|
||||
==== Multiple functions in the same application
|
||||
|
||||
Multiple functions aan be defined in the same application.
|
||||
When doing this, the binder will do a natural sorting on multiple function bean names first and then apply input and output bindings on them in the natural order.
|
||||
Consider the following two function beans in the same application.
|
||||
|
||||
[source]
|
||||
----
|
||||
@Bean
|
||||
public Function<KStream<?, String>, KStream<?, WordCount>> process1() {
|
||||
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<KStream<?, String>, KStream<?, WordCount>> process2() {
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
Consider also the following binding interface.
|
||||
|
||||
|
||||
[source]
|
||||
----
|
||||
interface Bindings {
|
||||
|
||||
@Input("input-1")
|
||||
KStream<?, ?> input1();
|
||||
|
||||
@Input("input-2")
|
||||
KStream<?, ?> input2();
|
||||
|
||||
@Ouput("output-1")
|
||||
KStream<?, ?> output1();
|
||||
|
||||
@Output("output-2")
|
||||
KStream<?, ?> output1();
|
||||
}
|
||||
----
|
||||
|
||||
Binder will first take the method `process1` and use input binding `input-1` and output binding `output-1`.
|
||||
Similarly, for the method `process2`, it will use input binding `input-2` and output binding `output-2`.
|
||||
|
||||
==== Using custom state stores in functional applications
|
||||
|
||||
You can define custom state stores as beans in your application and those will be detected and added to the Kafka Streams builder by the binder.
|
||||
Note that, for regular StreamListener based processors, you still need to use the `KafkaStreamsStateStore` annotation for custom state stores.
|
||||
Here is an example of using custom state stores with functional style described in this section.
|
||||
|
||||
[source]
|
||||
----
|
||||
@Bean
|
||||
public StoreBuilder myStore() {
|
||||
return Stores.keyValueStoreBuilder(
|
||||
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
|
||||
Serdes.Long());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public StoreBuilder otherStore() {
|
||||
return Stores.windowStoreBuilder(
|
||||
Stores.persistentWindowStore("other-store",
|
||||
1L, 3, 3L, false), Serdes.Long(),
|
||||
Serdes.Long());
|
||||
}
|
||||
----
|
||||
|
||||
These state stores can be then accessed by the applications directly.
|
||||
@@ -112,7 +112,7 @@ If set to `true`, the binder creates new topics automatically.
|
||||
If set to `false`, the binder relies on the topics being already configured.
|
||||
In the latter case, if the topics do not exist, the binder fails to start.
|
||||
+
|
||||
NOTE: This setting is independent of the `auto.create.topics.enable` setting of the broker and does not influence it.
|
||||
NOTE: This setting is independent of the `auto.topic.create.enable` setting of the broker and does not influence it.
|
||||
If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.
|
||||
+
|
||||
Default: `true`.
|
||||
@@ -142,9 +142,6 @@ Default: none.
|
||||
[[kafka-consumer-properties]]
|
||||
==== Kafka Consumer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka consumers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
|
||||
|
||||
@@ -267,9 +264,6 @@ Default: none (the binder-wide default of 1 is used).
|
||||
[[kafka-producer-properties]]
|
||||
==== Kafka Producer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka producers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.producer.`.
|
||||
|
||||
|
||||
4
pom.xml
4
pom.xml
@@ -2,7 +2,7 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>2.2.0.M1</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
@@ -15,7 +15,7 @@
|
||||
<spring-kafka.version>2.2.2.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.1.0.RELEASE</spring-integration-kafka.version>
|
||||
<kafka.version>2.0.0</kafka.version>
|
||||
<spring-cloud-stream.version>2.2.0.RELEASE</spring-cloud-stream.version>
|
||||
<spring-cloud-stream.version>2.2.0.M1</spring-cloud-stream.version>
|
||||
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
|
||||
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
|
||||
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>2.2.0.M1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>2.2.0.M1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -25,8 +25,6 @@ import javax.validation.constraints.AssertTrue;
|
||||
import javax.validation.constraints.Min;
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
|
||||
@@ -58,8 +56,6 @@ public class KafkaBinderConfigurationProperties {
|
||||
|
||||
private static final String DEFAULT_KAFKA_CONNECTION_STRING = "localhost:9092";
|
||||
|
||||
private final Log logger = LogFactory.getLog(getClass());
|
||||
|
||||
private final Transaction transaction = new Transaction();
|
||||
|
||||
private final KafkaProperties kafkaProperties;
|
||||
@@ -533,7 +529,6 @@ public class KafkaBinderConfigurationProperties {
|
||||
}
|
||||
}
|
||||
consumerConfiguration.putAll(this.consumerProperties);
|
||||
filterStreamManagedConfiguration(consumerConfiguration);
|
||||
// Override Spring Boot bootstrap server setting if left to default with the value
|
||||
// configured in the binder
|
||||
return getConfigurationWithBootstrapServer(consumerConfiguration,
|
||||
@@ -564,25 +559,6 @@ public class KafkaBinderConfigurationProperties {
|
||||
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
|
||||
}
|
||||
|
||||
private void filterStreamManagedConfiguration(Map<String, Object> configuration) {
|
||||
if (configuration.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
|
||||
&& configuration.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals(true)) {
|
||||
logger.warn(constructIgnoredConfigMessage(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) +
|
||||
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + "=true is not supported by the Kafka binder");
|
||||
configuration.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
|
||||
}
|
||||
if (configuration.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
|
||||
logger.warn(constructIgnoredConfigMessage(ConsumerConfig.GROUP_ID_CONFIG) +
|
||||
"Use spring.cloud.stream.default.group or spring.cloud.stream.binding.<name>.group to specify " +
|
||||
"the group instead of " + ConsumerConfig.GROUP_ID_CONFIG);
|
||||
configuration.remove(ConsumerConfig.GROUP_ID_CONFIG);
|
||||
}
|
||||
}
|
||||
|
||||
private String constructIgnoredConfigMessage(String config) {
|
||||
return String.format("Ignoring provided value(s) for '%s'. ", config);
|
||||
}
|
||||
|
||||
private Map<String, Object> getConfigurationWithBootstrapServer(
|
||||
Map<String, Object> configuration, String bootstrapServersConfig) {
|
||||
if (ObjectUtils.isEmpty(configuration.get(bootstrapServersConfig))) {
|
||||
|
||||
@@ -1,108 +0,0 @@
|
||||
/*
|
||||
* Copyright 2018-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.properties;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class KafkaBinderConfigurationPropertiesTest {
|
||||
|
||||
@Test
|
||||
public void mergedConsumerConfigurationFiltersGroupIdFromKafkaProperties() {
|
||||
KafkaProperties kafkaProperties = new KafkaProperties();
|
||||
kafkaProperties.getConsumer().setGroupId("group1");
|
||||
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
|
||||
new KafkaBinderConfigurationProperties(kafkaProperties);
|
||||
|
||||
Map<String, Object> mergedConsumerConfiguration =
|
||||
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
|
||||
|
||||
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaProperties() {
|
||||
KafkaProperties kafkaProperties = new KafkaProperties();
|
||||
kafkaProperties.getConsumer().setEnableAutoCommit(true);
|
||||
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
|
||||
new KafkaBinderConfigurationProperties(kafkaProperties);
|
||||
|
||||
Map<String, Object> mergedConsumerConfiguration =
|
||||
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
|
||||
|
||||
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConfiguration() {
|
||||
KafkaProperties kafkaProperties = new KafkaProperties();
|
||||
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
|
||||
new KafkaBinderConfigurationProperties(kafkaProperties);
|
||||
kafkaBinderConfigurationProperties
|
||||
.setConfiguration(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));
|
||||
|
||||
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
|
||||
|
||||
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConfiguration() {
|
||||
KafkaProperties kafkaProperties = new KafkaProperties();
|
||||
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
|
||||
new KafkaBinderConfigurationProperties(kafkaProperties);
|
||||
kafkaBinderConfigurationProperties
|
||||
.setConfiguration(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
|
||||
|
||||
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
|
||||
|
||||
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConsumerProperties() {
|
||||
KafkaProperties kafkaProperties = new KafkaProperties();
|
||||
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
|
||||
new KafkaBinderConfigurationProperties(kafkaProperties);
|
||||
kafkaBinderConfigurationProperties
|
||||
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));
|
||||
|
||||
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
|
||||
|
||||
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConsumerProps() {
|
||||
KafkaProperties kafkaProperties = new KafkaProperties();
|
||||
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
|
||||
new KafkaBinderConfigurationProperties(kafkaProperties);
|
||||
kafkaBinderConfigurationProperties
|
||||
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
|
||||
|
||||
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
|
||||
|
||||
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
|
||||
}
|
||||
}
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>2.2.0.M1</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -27,12 +27,10 @@ import org.springframework.context.annotation.Configuration;
|
||||
/**
|
||||
* Application support configuration for Kafka Streams binder.
|
||||
*
|
||||
* @deprecated Features provided on this class can be directly configured in the application itself using Kafka Streams.
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
@Configuration
|
||||
@EnableConfigurationProperties(KafkaStreamsApplicationSupportProperties.class)
|
||||
@Deprecated
|
||||
public class KafkaStreamsApplicationSupportAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -31,13 +31,12 @@ import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.stream.binder.BinderConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.function.FunctionDetectorCondition;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.serde.CompositeNonNativeSerde;
|
||||
@@ -49,7 +48,6 @@ import org.springframework.cloud.stream.config.BindingServiceConfiguration;
|
||||
import org.springframework.cloud.stream.config.BindingServiceProperties;
|
||||
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Conditional;
|
||||
import org.springframework.core.env.ConfigurableEnvironment;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.core.env.MapPropertySource;
|
||||
@@ -242,7 +240,8 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Conditional(FunctionDetectorCondition.class)
|
||||
// @ConditionalOnProperty("spring.cloud.stream.kafka.streams.function.definition")
|
||||
@ConditionalOnProperty("spring.cloud.stream.function.definition")
|
||||
public KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties,
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
|
||||
KeyValueSerdeResolver keyValueSerdeResolver,
|
||||
@@ -303,7 +302,6 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
@SuppressWarnings("unchecked")
|
||||
@ConditionalOnMissingBean
|
||||
public KeyValueSerdeResolver keyValueSerdeResolver(
|
||||
@Qualifier("streamConfigGlobalProperties") Object streamConfigGlobalProperties,
|
||||
KafkaStreamsBinderConfigurationProperties properties) {
|
||||
|
||||
@@ -40,7 +40,6 @@ import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.KTable;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.StoreBuilder;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.BeanInitializationException;
|
||||
@@ -49,7 +48,6 @@ import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
|
||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
|
||||
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.core.FluxedConsumer;
|
||||
import org.springframework.cloud.function.core.FluxedFunction;
|
||||
import org.springframework.cloud.stream.binder.ConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
@@ -69,12 +67,10 @@ import org.springframework.kafka.core.CleanupConfig;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
* @since 2.2.0
|
||||
*/
|
||||
public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
|
||||
|
||||
@@ -92,8 +88,6 @@ public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
|
||||
|
||||
private ConfigurableApplicationContext applicationContext;
|
||||
|
||||
private Set<String> origInputs = new TreeSet<>();
|
||||
private Set<String> origOutputs = new TreeSet<>();
|
||||
|
||||
public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties,
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
|
||||
@@ -111,61 +105,50 @@ public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
|
||||
this.cleanupConfig = cleanupConfig;
|
||||
this.functionCatalog = functionCatalog;
|
||||
this.bindableProxyFactory = bindableProxyFactory;
|
||||
this.origInputs.addAll(this.bindableProxyFactory.getInputs());
|
||||
this.origOutputs.addAll(this.bindableProxyFactory.getOutputs());
|
||||
}
|
||||
|
||||
private Map<String, ResolvableType> buildTypeMap(ResolvableType resolvableType) {
|
||||
int inputCount = 1;
|
||||
final Set<String> inputs = new TreeSet<>(this.bindableProxyFactory.getInputs());
|
||||
|
||||
ResolvableType resolvableTypeGeneric = resolvableType.getGeneric(1);
|
||||
while (resolvableTypeGeneric != null && resolvableTypeGeneric.getRawClass() != null && (resolvableTypeGeneric.getRawClass().equals(Function.class) ||
|
||||
resolvableTypeGeneric.getRawClass().equals(Consumer.class))) {
|
||||
inputCount++;
|
||||
resolvableTypeGeneric = resolvableTypeGeneric.getGeneric(1);
|
||||
}
|
||||
|
||||
final Set<String> inputs = new TreeSet<>(origInputs);
|
||||
Map<String, ResolvableType> resolvableTypeMap = new LinkedHashMap<>();
|
||||
Map<String, ResolvableType> map = new LinkedHashMap<>();
|
||||
final Iterator<String> iterator = inputs.iterator();
|
||||
|
||||
final String next = iterator.next();
|
||||
resolvableTypeMap.put(next, resolvableType.getGeneric(0));
|
||||
origInputs.remove(next);
|
||||
if (iterator.hasNext()) {
|
||||
map.put(iterator.next(), resolvableType.getGeneric(0));
|
||||
ResolvableType generic = resolvableType.getGeneric(1);
|
||||
|
||||
for (int i = 1; i < inputCount; i++) {
|
||||
if (iterator.hasNext()) {
|
||||
ResolvableType generic = resolvableType.getGeneric(1);
|
||||
while (iterator.hasNext() && generic != null) {
|
||||
if (generic.getRawClass() != null &&
|
||||
(generic.getRawClass().equals(Function.class) ||
|
||||
generic.getRawClass().equals(Consumer.class))) {
|
||||
final String next1 = iterator.next();
|
||||
resolvableTypeMap.put(next1, generic.getGeneric(0));
|
||||
origInputs.remove(next1);
|
||||
map.put(iterator.next(), generic.getGeneric(0));
|
||||
}
|
||||
generic = generic.getGeneric(1);
|
||||
}
|
||||
}
|
||||
return resolvableTypeMap;
|
||||
|
||||
return map;
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public void orchestrateFunctionInvoking(ResolvableType resolvableType, String functionName) {
|
||||
@SuppressWarnings("unchecked")
|
||||
public void orchestrateStreamListenerSetupMethod(ResolvableType resolvableType, String functionName) {
|
||||
final Set<String> outputs = new TreeSet<>(this.bindableProxyFactory.getOutputs());
|
||||
|
||||
String[] methodAnnotatedOutboundNames = new String[outputs.size()];
|
||||
int j = 0;
|
||||
for (String output : outputs) {
|
||||
methodAnnotatedOutboundNames[j++] = output;
|
||||
}
|
||||
|
||||
final Map<String, ResolvableType> stringResolvableTypeMap = buildTypeMap(resolvableType);
|
||||
Object[] adaptedInboundArguments = adaptAndRetrieveInboundArguments(stringResolvableTypeMap, functionName);
|
||||
Object[] adaptedInboundArguments = adaptAndRetrieveInboundArguments(stringResolvableTypeMap, "foobar");
|
||||
try {
|
||||
if (resolvableType.getRawClass() != null && resolvableType.getRawClass().equals(Consumer.class)) {
|
||||
FluxedConsumer fluxedConsumer = functionCatalog.lookup(FluxedConsumer.class, functionName);
|
||||
Assert.isTrue(fluxedConsumer != null,
|
||||
"No corresponding consumer beans found in the catalog");
|
||||
Object target = fluxedConsumer.getTarget();
|
||||
|
||||
Consumer<Object> consumer = Consumer.class.isAssignableFrom(target.getClass()) ? (Consumer) target : null;
|
||||
|
||||
if (consumer != null) {
|
||||
consumer.accept(adaptedInboundArguments[0]);
|
||||
}
|
||||
Consumer<Object> consumer = functionCatalog.lookup(Consumer.class, functionName);
|
||||
consumer.accept(adaptedInboundArguments[0]);
|
||||
}
|
||||
else {
|
||||
|
||||
Function<Object, Object> function = functionCatalog.lookup(Function.class, functionName);
|
||||
Object target = null;
|
||||
if (function instanceof FluxedFunction) {
|
||||
@@ -185,20 +168,15 @@ public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
|
||||
i++;
|
||||
}
|
||||
if (result != null) {
|
||||
final Set<String> outputs = new TreeSet<>(origOutputs);
|
||||
final Iterator<String> iterator = outputs.iterator();
|
||||
|
||||
if (result.getClass().isArray()) {
|
||||
final int length = ((Object[]) result).length;
|
||||
String[] methodAnnotatedOutboundNames = new String[length];
|
||||
|
||||
for (int j = 0; j < length; j++) {
|
||||
if (iterator.hasNext()) {
|
||||
final String next = iterator.next();
|
||||
methodAnnotatedOutboundNames[j] = next;
|
||||
this.origOutputs.remove(next);
|
||||
}
|
||||
}
|
||||
Assert.isTrue(methodAnnotatedOutboundNames.length == ((Object[]) result).length,
|
||||
"Result does not match with the number of declared outbounds");
|
||||
}
|
||||
else {
|
||||
Assert.isTrue(methodAnnotatedOutboundNames.length == 1,
|
||||
"Result does not match with the number of declared outbounds");
|
||||
}
|
||||
if (result.getClass().isArray()) {
|
||||
Object[] outboundKStreams = (Object[]) result;
|
||||
int k = 0;
|
||||
for (Object outboundKStream : outboundKStreams) {
|
||||
@@ -210,15 +188,11 @@ public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (iterator.hasNext()) {
|
||||
final String next = iterator.next();
|
||||
Object targetBean = this.applicationContext.getBean(next);
|
||||
this.origOutputs.remove(next);
|
||||
Object targetBean = this.applicationContext.getBean(methodAnnotatedOutboundNames[0]);
|
||||
|
||||
KStreamBoundElementFactory.KStreamWrapper
|
||||
boundElement = (KStreamBoundElementFactory.KStreamWrapper) targetBean;
|
||||
boundElement.wrap((KStream) result);
|
||||
}
|
||||
KStreamBoundElementFactory.KStreamWrapper
|
||||
boundElement = (KStreamBoundElementFactory.KStreamWrapper) targetBean;
|
||||
boundElement.wrap((KStream) result);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -253,6 +227,7 @@ public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
|
||||
KafkaStreamsConsumerProperties extendedConsumerProperties =
|
||||
this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(input);
|
||||
//get state store spec
|
||||
//KafkaStreamsStateStoreProperties spec = buildStateStoreSpec(method);
|
||||
Serde<?> keySerde = this.keyValueSerdeResolver.getInboundKeySerde(extendedConsumerProperties);
|
||||
Serde<?> valueSerde = this.keyValueSerdeResolver.getInboundValueSerde(
|
||||
bindingProperties.getConsumer(), extendedConsumerProperties);
|
||||
@@ -389,21 +364,6 @@ public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
|
||||
BindingProperties bindingProperties,
|
||||
StreamsBuilder streamsBuilder,
|
||||
Serde<?> keySerde, Serde<?> valueSerde, Topology.AutoOffsetReset autoOffsetReset) {
|
||||
try {
|
||||
final Map<String, StoreBuilder> storeBuilders = applicationContext.getBeansOfType(StoreBuilder.class);
|
||||
if (!CollectionUtils.isEmpty(storeBuilders)) {
|
||||
storeBuilders.values().forEach(storeBuilder -> {
|
||||
streamsBuilder.addStateStore(storeBuilder);
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("state store " + storeBuilder.name() + " added to topology");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Pass through.
|
||||
}
|
||||
|
||||
String[] bindingTargets = StringUtils
|
||||
.commaDelimitedListToStringArray(this.bindingServiceProperties.getBindingDestination(inboundName));
|
||||
|
||||
|
||||
@@ -19,12 +19,10 @@ package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.processor.Processor;
|
||||
@@ -79,15 +77,14 @@ public class KafkaStreamsMessageConversionDelegate {
|
||||
* @param outboundBindTarget outbound KStream target
|
||||
* @return serialized KStream
|
||||
*/
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
@SuppressWarnings("rawtypes")
|
||||
public KStream serializeOnOutbound(KStream<?, ?> outboundBindTarget) {
|
||||
String contentType = this.kstreamBindingInformationCatalogue
|
||||
.getContentType(outboundBindTarget);
|
||||
MessageConverter messageConverter = this.compositeMessageConverterFactory
|
||||
.getMessageConverterForAllRegistered();
|
||||
final PerRecordContentTypeHolder perRecordContentTypeHolder = new PerRecordContentTypeHolder();
|
||||
|
||||
final KStream<?, ?> kStreamWithEnrichedHeaders = outboundBindTarget.mapValues((v) -> {
|
||||
return outboundBindTarget.mapValues((v) -> {
|
||||
Message<?> message = v instanceof Message<?> ? (Message<?>) v
|
||||
: MessageBuilder.withPayload(v).build();
|
||||
Map<String, Object> headers = new HashMap<>(message.getHeaders());
|
||||
@@ -95,46 +92,9 @@ public class KafkaStreamsMessageConversionDelegate {
|
||||
headers.put(MessageHeaders.CONTENT_TYPE, contentType);
|
||||
}
|
||||
MessageHeaders messageHeaders = new MessageHeaders(headers);
|
||||
final Message<?> convertedMessage = messageConverter.toMessage(message.getPayload(), messageHeaders);
|
||||
perRecordContentTypeHolder.setContentType((String) messageHeaders.get(MessageHeaders.CONTENT_TYPE));
|
||||
return convertedMessage.getPayload();
|
||||
return messageConverter.toMessage(message.getPayload(), messageHeaders)
|
||||
.getPayload();
|
||||
});
|
||||
|
||||
kStreamWithEnrichedHeaders.process(() -> new Processor() {
|
||||
|
||||
ProcessorContext context;
|
||||
|
||||
@Override
|
||||
public void init(ProcessorContext context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Object key, Object value) {
|
||||
if (perRecordContentTypeHolder.contentType != null) {
|
||||
this.context.headers().remove(MessageHeaders.CONTENT_TYPE);
|
||||
final Header header;
|
||||
try {
|
||||
header = new RecordHeader(MessageHeaders.CONTENT_TYPE,
|
||||
new ObjectMapper().writeValueAsBytes(perRecordContentTypeHolder.contentType));
|
||||
this.context.headers().add(header);
|
||||
}
|
||||
catch (Exception e) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Could not add content type header");
|
||||
}
|
||||
}
|
||||
perRecordContentTypeHolder.unsetContentType();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
return kStreamWithEnrichedHeaders;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -323,10 +283,6 @@ public class KafkaStreamsMessageConversionDelegate {
|
||||
this.contentType = contentType;
|
||||
}
|
||||
|
||||
void unsetContentType() {
|
||||
this.contentType = null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -569,11 +569,10 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator
|
||||
(Class<StreamsBuilderFactoryBean>) streamsBuilder.getClass(),
|
||||
() -> streamsBuilder)
|
||||
.getRawBeanDefinition();
|
||||
final String beanNamePostFix = method.getDeclaringClass().getSimpleName() + "-" + method.getName();
|
||||
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(
|
||||
"stream-builder-" + beanNamePostFix, streamsBuilderBeanDefinition);
|
||||
"stream-builder-" + method.getName(), streamsBuilderBeanDefinition);
|
||||
StreamsBuilderFactoryBean streamsBuilderX = applicationContext.getBean(
|
||||
"&stream-builder-" + beanNamePostFix, StreamsBuilderFactoryBean.class);
|
||||
"&stream-builder-" + method.getName(), StreamsBuilderFactoryBean.class);
|
||||
this.methodStreamsBuilderFactoryBeanMap.put(method, streamsBuilderX);
|
||||
}
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ import org.springframework.util.StringUtils;
|
||||
* @author Soby Chacko
|
||||
* @author Lei Chen
|
||||
*/
|
||||
public class KeyValueSerdeResolver {
|
||||
class KeyValueSerdeResolver {
|
||||
|
||||
private final Map<String, Object> streamConfigGlobalProperties;
|
||||
|
||||
|
||||
@@ -1,87 +0,0 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.function;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.kafka.streams.kstream.GlobalKTable;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.KTable;
|
||||
|
||||
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
|
||||
import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
|
||||
import org.springframework.context.annotation.ConditionContext;
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.type.AnnotatedTypeMetadata;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
/**
|
||||
* Custom {@link org.springframework.context.annotation.Condition} that detects the presence
|
||||
* of java.util.Function|Consumer beans. Used for Kafka Streams function support.
|
||||
*
|
||||
* @author Soby Chakco
|
||||
* @since 2.2.0
|
||||
*/
|
||||
public class FunctionDetectorCondition extends SpringBootCondition {
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Override
|
||||
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) {
|
||||
if (context != null && context.getBeanFactory() != null) {
|
||||
Map functionTypes = context.getBeanFactory().getBeansOfType(Function.class);
|
||||
functionTypes.putAll(context.getBeanFactory().getBeansOfType(Consumer.class));
|
||||
final Map<String, Object> kstreamFunctions = pruneFunctionBeansForKafkaStreams(functionTypes, context);
|
||||
|
||||
if (!kstreamFunctions.isEmpty()) {
|
||||
return ConditionOutcome.match("Matched. Function/Consumer beans found");
|
||||
}
|
||||
else {
|
||||
return ConditionOutcome.noMatch("No match. No Function/Consumer beans found");
|
||||
}
|
||||
}
|
||||
return ConditionOutcome.noMatch("No match. No Function/Consumer beans found");
|
||||
}
|
||||
|
||||
private static <T> Map<String, T> pruneFunctionBeansForKafkaStreams(Map<String, T> originalFunctionBeans,
|
||||
ConditionContext context) {
|
||||
final Map<String, T> prunedMap = new HashMap<>();
|
||||
|
||||
for (String key : originalFunctionBeans.keySet()) {
|
||||
final Class<?> classObj = ClassUtils.resolveClassName(((AnnotatedBeanDefinition)
|
||||
context.getBeanFactory().getBeanDefinition(key))
|
||||
.getMetadata().getClassName(),
|
||||
ClassUtils.getDefaultClassLoader());
|
||||
try {
|
||||
Method method = classObj.getMethod(key);
|
||||
ResolvableType resolvableType = ResolvableType.forMethodReturnType(method, classObj);
|
||||
final Class<?> rawClass = resolvableType.getGeneric(0).getRawClass();
|
||||
if (rawClass == KStream.class || rawClass == KTable.class || rawClass == GlobalKTable.class) {
|
||||
prunedMap.put(key, originalFunctionBeans.get(key));
|
||||
}
|
||||
}
|
||||
catch (NoSuchMethodException e) {
|
||||
//ignore
|
||||
}
|
||||
}
|
||||
return prunedMap;
|
||||
}
|
||||
}
|
||||
@@ -16,32 +16,34 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.function;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsFunctionProcessor;
|
||||
import org.springframework.cloud.stream.function.StreamFunctionProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Conditional;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
* @since 2.2.0
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnProperty("spring.cloud.stream.function.definition")
|
||||
@EnableConfigurationProperties(StreamFunctionProperties.class)
|
||||
public class KafkaStreamsFunctionAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
@Conditional(FunctionDetectorCondition.class)
|
||||
public KafkaStreamsFunctionProcessorInvoker kafkaStreamsFunctionProcessorInvoker(
|
||||
KafkaStreamsFunctionBeanPostProcessor kafkaStreamsFunctionBeanPostProcessor,
|
||||
KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor) {
|
||||
return new KafkaStreamsFunctionProcessorInvoker(kafkaStreamsFunctionBeanPostProcessor.getResolvableTypes(),
|
||||
kafkaStreamsFunctionProcessor);
|
||||
KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor,
|
||||
StreamFunctionProperties properties) {
|
||||
return new KafkaStreamsFunctionProcessorInvoker(kafkaStreamsFunctionBeanPostProcessor.getResolvableType(),
|
||||
properties.getDefinition(), kafkaStreamsFunctionProcessor);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaStreamsFunctionBeanPostProcessor kafkaStreamsFunctionBeanPostProcessor() {
|
||||
return new KafkaStreamsFunctionBeanPostProcessor();
|
||||
public KafkaStreamsFunctionBeanPostProcessor kafkaStreamsFunctionBeanPostProcessor(
|
||||
StreamFunctionProperties properties) {
|
||||
return new KafkaStreamsFunctionBeanPostProcessor(properties);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -17,11 +17,6 @@
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.function;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
@@ -29,42 +24,40 @@ import org.springframework.beans.factory.BeanFactoryAware;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
|
||||
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
|
||||
import org.springframework.cloud.stream.function.StreamFunctionProperties;
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @since 2.2.0
|
||||
* @since 2.1.0
|
||||
*
|
||||
*/
|
||||
class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean, BeanFactoryAware {
|
||||
|
||||
private final StreamFunctionProperties kafkaStreamsFunctionProperties;
|
||||
private ConfigurableListableBeanFactory beanFactory;
|
||||
private Map<String, ResolvableType> resolvableTypeMap = new TreeMap<>();
|
||||
private ResolvableType resolvableType;
|
||||
|
||||
public Map<String, ResolvableType> getResolvableTypes() {
|
||||
return this.resolvableTypeMap;
|
||||
KafkaStreamsFunctionBeanPostProcessor(StreamFunctionProperties properties) {
|
||||
this.kafkaStreamsFunctionProperties = properties;
|
||||
}
|
||||
|
||||
public ResolvableType getResolvableType() {
|
||||
return this.resolvableType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
|
||||
String[] functionNames = this.beanFactory.getBeanNamesForType(Function.class);
|
||||
String[] consumerNames = this.beanFactory.getBeanNamesForType(Consumer.class);
|
||||
|
||||
Stream.concat(Stream.of(functionNames), Stream.of(consumerNames)).forEach(this::extractResolvableTypes);
|
||||
}
|
||||
|
||||
private void extractResolvableTypes(String key) {
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
final Class<?> classObj = ClassUtils.resolveClassName(((AnnotatedBeanDefinition)
|
||||
this.beanFactory.getBeanDefinition(key))
|
||||
this.beanFactory.getBeanDefinition(kafkaStreamsFunctionProperties.getDefinition()))
|
||||
.getMetadata().getClassName(),
|
||||
ClassUtils.getDefaultClassLoader());
|
||||
|
||||
try {
|
||||
Method method = classObj.getMethod(key);
|
||||
ResolvableType resolvableType = ResolvableType.forMethodReturnType(method, classObj);
|
||||
resolvableTypeMap.put(key, resolvableType);
|
||||
Method method = classObj.getMethod(this.kafkaStreamsFunctionProperties.getDefinition());
|
||||
this.resolvableType = ResolvableType.forMethodReturnType(method, classObj);
|
||||
}
|
||||
catch (NoSuchMethodException e) {
|
||||
//ignore
|
||||
|
||||
@@ -16,8 +16,6 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.function;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsFunctionProcessor;
|
||||
@@ -31,17 +29,18 @@ import org.springframework.core.ResolvableType;
|
||||
class KafkaStreamsFunctionProcessorInvoker {
|
||||
|
||||
private final KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor;
|
||||
private final Map<String, ResolvableType> resolvableTypeMap;
|
||||
private final ResolvableType resolvableType;
|
||||
private final String functionName;
|
||||
|
||||
KafkaStreamsFunctionProcessorInvoker(Map<String, ResolvableType> resolvableTypeMap,
|
||||
KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor) {
|
||||
KafkaStreamsFunctionProcessorInvoker(ResolvableType resolvableType, String functionName,
|
||||
KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor) {
|
||||
this.kafkaStreamsFunctionProcessor = kafkaStreamsFunctionProcessor;
|
||||
this.resolvableTypeMap = resolvableTypeMap;
|
||||
this.resolvableType = resolvableType;
|
||||
this.functionName = functionName;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
void invoke() {
|
||||
resolvableTypeMap.forEach((key, value) ->
|
||||
this.kafkaStreamsFunctionProcessor.orchestrateFunctionInvoking(value, key));
|
||||
this.kafkaStreamsFunctionProcessor.orchestrateStreamListenerSetupMethod(resolvableType, functionName);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,6 @@ import org.springframework.cloud.function.context.WrapperDetector;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
* @since 2.2.0
|
||||
*/
|
||||
public class KafkaStreamsFunctionWrapperDetector implements WrapperDetector {
|
||||
@Override
|
||||
|
||||
@@ -25,11 +25,9 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
* stream processing and one can provide window specific properties at runtime and use
|
||||
* those properties in the applications using this class.
|
||||
*
|
||||
* @deprecated The properties exposed by this class can be used directly on Kafka Streams API in the application.
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
@ConfigurationProperties("spring.cloud.stream.kafka.streams")
|
||||
@Deprecated
|
||||
public class KafkaStreamsApplicationSupportProperties {
|
||||
|
||||
private TimeWindow timeWindow;
|
||||
|
||||
@@ -85,6 +85,7 @@ public class KafkaStreamsBinderWordCountBranchesFunctionTests {
|
||||
|
||||
ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.function.definition=process",
|
||||
"--spring.cloud.stream.bindings.input.destination=words",
|
||||
"--spring.cloud.stream.bindings.output1.destination=counts",
|
||||
"--spring.cloud.stream.bindings.output1.contentType=application/json",
|
||||
|
||||
@@ -85,6 +85,7 @@ public class KafkaStreamsBinderWordCountFunctionTests {
|
||||
|
||||
try (ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.function.definition=process",
|
||||
"--spring.cloud.stream.bindings.input.destination=words",
|
||||
"--spring.cloud.stream.bindings.output.destination=counts",
|
||||
"--spring.cloud.stream.bindings.output.contentType=application/json",
|
||||
@@ -185,4 +186,5 @@ public class KafkaStreamsBinderWordCountFunctionTests {
|
||||
new Date(key.window().start()), new Date(key.window().end()))));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,158 +0,0 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.function;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.processor.Processor;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.ProcessorSupplier;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.StoreBuilder;
|
||||
import org.apache.kafka.streams.state.Stores;
|
||||
import org.apache.kafka.streams.state.WindowStore;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.test.EmbeddedKafkaBroker;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class KafkaStreamsFunctionStateStoreTests {
|
||||
|
||||
@ClassRule
|
||||
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
|
||||
"counts");
|
||||
|
||||
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
|
||||
|
||||
@Test
|
||||
public void testKafkaStreamsFuncionWithMultipleStateStores() throws Exception {
|
||||
SpringApplication app = new SpringApplication(StateStoreTestApplication.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
|
||||
try (ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.input.destination=words",
|
||||
"--spring.cloud.stream.kafka.streams.default.consumer.application-id=basic-word-count-1",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
|
||||
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" +
|
||||
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
|
||||
receiveAndValidate(context);
|
||||
}
|
||||
}
|
||||
|
||||
private void receiveAndValidate(ConfigurableApplicationContext context) throws Exception {
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
try {
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
|
||||
template.setDefaultTopic("words");
|
||||
template.sendDefault("foobar");
|
||||
Thread.sleep(2000L);
|
||||
StateStoreTestApplication processorApplication = context
|
||||
.getBean(StateStoreTestApplication.class);
|
||||
|
||||
KeyValueStore<Long, Long> state1 = processorApplication.state1;
|
||||
assertThat(processorApplication.processed).isTrue();
|
||||
assertThat(state1 != null).isTrue();
|
||||
assertThat(state1.name()).isEqualTo("my-store");
|
||||
WindowStore<Long, Long> state2 = processorApplication.state2;
|
||||
assertThat(state2 != null).isTrue();
|
||||
assertThat(state2.name()).isEqualTo("other-store");
|
||||
assertThat(state2.persistent()).isTrue();
|
||||
}
|
||||
finally {
|
||||
pf.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@EnableBinding(KStreamProcessorX.class)
|
||||
@EnableAutoConfiguration
|
||||
static class StateStoreTestApplication {
|
||||
|
||||
KeyValueStore<Long, Long> state1;
|
||||
WindowStore<Long, Long> state2;
|
||||
|
||||
boolean processed;
|
||||
|
||||
@Bean
|
||||
public java.util.function.Consumer<KStream<Object, String>> process() {
|
||||
return input ->
|
||||
input.process((ProcessorSupplier<Object, String>) () -> new Processor<Object, String>() {
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void init(ProcessorContext context) {
|
||||
state1 = (KeyValueStore<Long, Long>) context.getStateStore("my-store");
|
||||
state2 = (WindowStore<Long, Long>) context.getStateStore("other-store");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Object key, String value) {
|
||||
processed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (state1 != null) {
|
||||
state1.close();
|
||||
}
|
||||
if (state2 != null) {
|
||||
state2.close();
|
||||
}
|
||||
}
|
||||
}, "my-store", "other-store");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public StoreBuilder myStore() {
|
||||
return Stores.keyValueStoreBuilder(
|
||||
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
|
||||
Serdes.Long());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public StoreBuilder otherStore() {
|
||||
return Stores.windowStoreBuilder(
|
||||
Stores.persistentWindowStore("other-store",
|
||||
1L, 3, 3L, false), Serdes.Long(),
|
||||
Serdes.Long());
|
||||
}
|
||||
}
|
||||
|
||||
interface KStreamProcessorX {
|
||||
@Input("input")
|
||||
KStream<?, ?> input();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -72,6 +72,7 @@ public class StreamToGlobalKTableFunctionTests {
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.function.definition=process",
|
||||
"--spring.cloud.stream.bindings.input.destination=orders",
|
||||
"--spring.cloud.stream.bindings.input-x.destination=customers",
|
||||
"--spring.cloud.stream.bindings.input-y.destination=products",
|
||||
|
||||
@@ -231,6 +231,7 @@ public class StreamToTableJoinFunctionTests {
|
||||
|
||||
try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.function.definition=process1",
|
||||
"--spring.cloud.stream.bindings.input-1.destination=user-clicks-2",
|
||||
"--spring.cloud.stream.bindings.input-2.destination=user-regions-2",
|
||||
"--spring.cloud.stream.bindings.output.destination=output-topic-2",
|
||||
|
||||
@@ -155,7 +155,7 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
|
||||
receiveAndValidate(context);
|
||||
// Assertions on StreamBuilderFactoryBean
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context
|
||||
.getBean("&stream-builder-WordCountProcessorApplication-process", StreamsBuilderFactoryBean.class);
|
||||
.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
|
||||
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
ReadOnlyWindowStore<Object, Object> store = kafkaStreams
|
||||
.store("foo-WordCounts", QueryableStoreTypes.windowStore());
|
||||
|
||||
@@ -108,7 +108,7 @@ public class KafkastreamsBinderPojoInputStringOutputIntegrationTests {
|
||||
receiveAndValidateFoo(context);
|
||||
// Assertions on StreamBuilderFactoryBean
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context
|
||||
.getBean("&stream-builder-ProductCountApplication-process", StreamsBuilderFactoryBean.class);
|
||||
.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
|
||||
CleanupConfig cleanup = TestUtils.getPropertyValue(streamsBuilderFactoryBean,
|
||||
"cleanupConfig", CleanupConfig.class);
|
||||
assertThat(cleanup.cleanupOnStart()).isFalse();
|
||||
|
||||
@@ -1,105 +0,0 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.integration;
|
||||
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
import org.springframework.kafka.test.EmbeddedKafkaBroker;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class MultiProcessorsWithSameNameTests {
|
||||
|
||||
@ClassRule
|
||||
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
|
||||
"counts");
|
||||
|
||||
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
|
||||
.getEmbeddedKafka();
|
||||
|
||||
@Test
|
||||
public void testBinderStartsSuccessfullyWhenTwoProcessorsWithSameNamesArePresent() {
|
||||
SpringApplication app = new SpringApplication(
|
||||
MultiProcessorsWithSameNameTests.WordCountProcessorApplication.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
|
||||
try (ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.input.destination=words",
|
||||
"--spring.cloud.stream.bindings.input-2.destination=words",
|
||||
"--spring.cloud.stream.bindings.output.destination=counts",
|
||||
"--spring.cloud.stream.bindings.output.contentType=application/json",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-1.consumer.application-id=basic-word-count",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-2.consumer.application-id=basic-word-count-1",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers="
|
||||
+ embeddedKafka.getBrokersAsString(),
|
||||
"--spring.cloud.stream.kafka.streams.binder.zkNodes="
|
||||
+ embeddedKafka.getZookeeperConnectionString())) {
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean1 = context
|
||||
.getBean("&stream-builder-Foo-process", StreamsBuilderFactoryBean.class);
|
||||
assertThat(streamsBuilderFactoryBean1).isNotNull();
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean2 = context
|
||||
.getBean("&stream-builder-Bar-process", StreamsBuilderFactoryBean.class);
|
||||
assertThat(streamsBuilderFactoryBean2).isNotNull();
|
||||
}
|
||||
}
|
||||
|
||||
@EnableBinding(KafkaStreamsProcessorX.class)
|
||||
@EnableAutoConfiguration
|
||||
@EnableConfigurationProperties(KafkaStreamsApplicationSupportProperties.class)
|
||||
static class WordCountProcessorApplication {
|
||||
|
||||
@Component
|
||||
static class Foo {
|
||||
@StreamListener
|
||||
public void process(@Input("input-1") KStream<Object, String> input) {
|
||||
}
|
||||
}
|
||||
|
||||
//Second class with a stub processor that has the same name as above ("process")
|
||||
@Component
|
||||
static class Bar {
|
||||
@StreamListener
|
||||
public void process(@Input("input-2") KStream<Object, String> input) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface KafkaStreamsProcessorX {
|
||||
|
||||
@Input("input-1")
|
||||
KStream<?, ?> input1();
|
||||
|
||||
@Input("input-2")
|
||||
KStream<?, ?> input2();
|
||||
|
||||
}
|
||||
}
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>2.2.0.M1</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -314,9 +314,7 @@ public class KafkaMessageChannelBinder extends
|
||||
List<PartitionInfo> partitionsFor = producer
|
||||
.partitionsFor(destination.getName());
|
||||
producer.close();
|
||||
if (this.transactionManager == null) {
|
||||
((DisposableBean) producerFB).destroy();
|
||||
}
|
||||
((DisposableBean) producerFB).destroy();
|
||||
return partitionsFor;
|
||||
}, destination.getName());
|
||||
this.topicsInUse.put(destination.getName(),
|
||||
@@ -535,7 +533,7 @@ public class KafkaMessageChannelBinder extends
|
||||
messageListenerContainer
|
||||
.setApplicationEventPublisher(getApplicationContext());
|
||||
}
|
||||
messageListenerContainer.setBeanName(destination + ".container");
|
||||
messageListenerContainer.setBeanName(topics + ".container");
|
||||
// end of these won't be needed...
|
||||
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
|
||||
messageListenerContainer.getContainerProperties()
|
||||
|
||||
@@ -105,7 +105,7 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
|
||||
assertThat(consumerConfigs.get("value.deserializer"))
|
||||
.isEqualTo(LongDeserializer.class);
|
||||
assertThat(consumerConfigs.get("value.serialized")).isNull();
|
||||
assertThat(consumerConfigs.get("group.id")).isEqualTo("test");
|
||||
assertThat(consumerConfigs.get("group.id")).isEqualTo("groupIdFromBootConfig");
|
||||
assertThat(consumerConfigs.get("auto.offset.reset")).isEqualTo("earliest");
|
||||
assertThat((((List<String>) consumerConfigs.get("bootstrap.servers"))
|
||||
.containsAll(bootstrapServers))).isTrue();
|
||||
|
||||
Reference in New Issue
Block a user