Compare commits
10 Commits
v2.2.0.M1
...
v2.2.0.RC1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
383add504a | ||
|
|
5f9395a5ec | ||
|
|
70eb25d413 | ||
|
|
6bc74c6e5c | ||
|
|
33603c62f0 | ||
|
|
efd46835a1 | ||
|
|
3a267bc751 | ||
|
|
62e98df0c7 | ||
|
|
9e156911b4 | ||
|
|
cd28454818 |
@@ -132,7 +132,7 @@ If set to `true`, the binder creates new topics automatically.
|
||||
If set to `false`, the binder relies on the topics being already configured.
|
||||
In the latter case, if the topics do not exist, the binder fails to start.
|
||||
+
|
||||
NOTE: This setting is independent of the `auto.topic.create.enable` setting of the broker and does not influence it.
|
||||
NOTE: This setting is independent of the `auto.create.topics.enable` setting of the broker and does not influence it.
|
||||
If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.
|
||||
+
|
||||
Default: `true`.
|
||||
@@ -162,6 +162,9 @@ Default: none.
|
||||
[[kafka-consumer-properties]]
|
||||
==== Kafka Consumer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka consumers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
|
||||
|
||||
@@ -284,6 +287,9 @@ Default: none (the binder-wide default of 1 is used).
|
||||
[[kafka-producer-properties]]
|
||||
==== Kafka Producer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka producers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.producer.`.
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.0.RC1</version>
|
||||
</parent>
|
||||
<packaging>pom</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
|
||||
@@ -112,7 +112,7 @@ If set to `true`, the binder creates new topics automatically.
|
||||
If set to `false`, the binder relies on the topics being already configured.
|
||||
In the latter case, if the topics do not exist, the binder fails to start.
|
||||
+
|
||||
NOTE: This setting is independent of the `auto.topic.create.enable` setting of the broker and does not influence it.
|
||||
NOTE: This setting is independent of the `auto.create.topics.enable` setting of the broker and does not influence it.
|
||||
If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.
|
||||
+
|
||||
Default: `true`.
|
||||
@@ -142,6 +142,9 @@ Default: none.
|
||||
[[kafka-consumer-properties]]
|
||||
==== Kafka Consumer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka consumers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
|
||||
|
||||
@@ -264,6 +267,9 @@ Default: none (the binder-wide default of 1 is used).
|
||||
[[kafka-producer-properties]]
|
||||
==== Kafka Producer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka producers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.producer.`.
|
||||
|
||||
|
||||
6
pom.xml
6
pom.xml
@@ -2,12 +2,12 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.0.RC1</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>2.1.4.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.4.RELEASE</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
@@ -15,7 +15,7 @@
|
||||
<spring-kafka.version>2.2.2.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.1.0.RELEASE</spring-integration-kafka.version>
|
||||
<kafka.version>2.0.0</kafka.version>
|
||||
<spring-cloud-stream.version>2.2.0.BUILD-SNAPSHOT</spring-cloud-stream.version>
|
||||
<spring-cloud-stream.version>2.2.0.RC1</spring-cloud-stream.version>
|
||||
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
|
||||
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
|
||||
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.0.RC1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.0.RC1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -25,6 +25,8 @@ import javax.validation.constraints.AssertTrue;
|
||||
import javax.validation.constraints.Min;
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
|
||||
@@ -56,6 +58,8 @@ public class KafkaBinderConfigurationProperties {
|
||||
|
||||
private static final String DEFAULT_KAFKA_CONNECTION_STRING = "localhost:9092";
|
||||
|
||||
private final Log logger = LogFactory.getLog(getClass());
|
||||
|
||||
private final Transaction transaction = new Transaction();
|
||||
|
||||
private final KafkaProperties kafkaProperties;
|
||||
@@ -529,6 +533,7 @@ public class KafkaBinderConfigurationProperties {
|
||||
}
|
||||
}
|
||||
consumerConfiguration.putAll(this.consumerProperties);
|
||||
filterStreamManagedConfiguration(consumerConfiguration);
|
||||
// Override Spring Boot bootstrap server setting if left to default with the value
|
||||
// configured in the binder
|
||||
return getConfigurationWithBootstrapServer(consumerConfiguration,
|
||||
@@ -559,6 +564,25 @@ public class KafkaBinderConfigurationProperties {
|
||||
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
|
||||
}
|
||||
|
||||
private void filterStreamManagedConfiguration(Map<String, Object> configuration) {
|
||||
if (configuration.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
|
||||
&& configuration.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals(true)) {
|
||||
logger.warn(constructIgnoredConfigMessage(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) +
|
||||
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + "=true is not supported by the Kafka binder");
|
||||
configuration.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
|
||||
}
|
||||
if (configuration.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
|
||||
logger.warn(constructIgnoredConfigMessage(ConsumerConfig.GROUP_ID_CONFIG) +
|
||||
"Use spring.cloud.stream.default.group or spring.cloud.stream.binding.<name>.group to specify " +
|
||||
"the group instead of " + ConsumerConfig.GROUP_ID_CONFIG);
|
||||
configuration.remove(ConsumerConfig.GROUP_ID_CONFIG);
|
||||
}
|
||||
}
|
||||
|
||||
private String constructIgnoredConfigMessage(String config) {
|
||||
return String.format("Ignoring provided value(s) for '%s'. ", config);
|
||||
}
|
||||
|
||||
private Map<String, Object> getConfigurationWithBootstrapServer(
|
||||
Map<String, Object> configuration, String bootstrapServersConfig) {
|
||||
if (ObjectUtils.isEmpty(configuration.get(bootstrapServersConfig))) {
|
||||
|
||||
@@ -0,0 +1,108 @@
|
||||
/*
|
||||
* Copyright 2018-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.properties;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class KafkaBinderConfigurationPropertiesTest {
|
||||
|
||||
@Test
|
||||
public void mergedConsumerConfigurationFiltersGroupIdFromKafkaProperties() {
|
||||
KafkaProperties kafkaProperties = new KafkaProperties();
|
||||
kafkaProperties.getConsumer().setGroupId("group1");
|
||||
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
|
||||
new KafkaBinderConfigurationProperties(kafkaProperties);
|
||||
|
||||
Map<String, Object> mergedConsumerConfiguration =
|
||||
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
|
||||
|
||||
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaProperties() {
|
||||
KafkaProperties kafkaProperties = new KafkaProperties();
|
||||
kafkaProperties.getConsumer().setEnableAutoCommit(true);
|
||||
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
|
||||
new KafkaBinderConfigurationProperties(kafkaProperties);
|
||||
|
||||
Map<String, Object> mergedConsumerConfiguration =
|
||||
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
|
||||
|
||||
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConfiguration() {
|
||||
KafkaProperties kafkaProperties = new KafkaProperties();
|
||||
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
|
||||
new KafkaBinderConfigurationProperties(kafkaProperties);
|
||||
kafkaBinderConfigurationProperties
|
||||
.setConfiguration(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));
|
||||
|
||||
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
|
||||
|
||||
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConfiguration() {
|
||||
KafkaProperties kafkaProperties = new KafkaProperties();
|
||||
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
|
||||
new KafkaBinderConfigurationProperties(kafkaProperties);
|
||||
kafkaBinderConfigurationProperties
|
||||
.setConfiguration(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
|
||||
|
||||
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
|
||||
|
||||
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConsumerProperties() {
|
||||
KafkaProperties kafkaProperties = new KafkaProperties();
|
||||
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
|
||||
new KafkaBinderConfigurationProperties(kafkaProperties);
|
||||
kafkaBinderConfigurationProperties
|
||||
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));
|
||||
|
||||
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
|
||||
|
||||
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConsumerProps() {
|
||||
KafkaProperties kafkaProperties = new KafkaProperties();
|
||||
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
|
||||
new KafkaBinderConfigurationProperties(kafkaProperties);
|
||||
kafkaBinderConfigurationProperties
|
||||
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
|
||||
|
||||
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
|
||||
|
||||
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
|
||||
}
|
||||
}
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.0.RC1</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -569,10 +569,11 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator
|
||||
(Class<StreamsBuilderFactoryBean>) streamsBuilder.getClass(),
|
||||
() -> streamsBuilder)
|
||||
.getRawBeanDefinition();
|
||||
final String beanNamePostFix = method.getDeclaringClass().getSimpleName() + "-" + method.getName();
|
||||
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(
|
||||
"stream-builder-" + method.getName(), streamsBuilderBeanDefinition);
|
||||
"stream-builder-" + beanNamePostFix, streamsBuilderBeanDefinition);
|
||||
StreamsBuilderFactoryBean streamsBuilderX = applicationContext.getBean(
|
||||
"&stream-builder-" + method.getName(), StreamsBuilderFactoryBean.class);
|
||||
"&stream-builder-" + beanNamePostFix, StreamsBuilderFactoryBean.class);
|
||||
this.methodStreamsBuilderFactoryBeanMap.put(method, streamsBuilderX);
|
||||
}
|
||||
|
||||
|
||||
@@ -155,7 +155,7 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
|
||||
receiveAndValidate(context);
|
||||
// Assertions on StreamBuilderFactoryBean
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context
|
||||
.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
|
||||
.getBean("&stream-builder-WordCountProcessorApplication-process", StreamsBuilderFactoryBean.class);
|
||||
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
ReadOnlyWindowStore<Object, Object> store = kafkaStreams
|
||||
.store("foo-WordCounts", QueryableStoreTypes.windowStore());
|
||||
|
||||
@@ -108,7 +108,7 @@ public class KafkastreamsBinderPojoInputStringOutputIntegrationTests {
|
||||
receiveAndValidateFoo(context);
|
||||
// Assertions on StreamBuilderFactoryBean
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context
|
||||
.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
|
||||
.getBean("&stream-builder-ProductCountApplication-process", StreamsBuilderFactoryBean.class);
|
||||
CleanupConfig cleanup = TestUtils.getPropertyValue(streamsBuilderFactoryBean,
|
||||
"cleanupConfig", CleanupConfig.class);
|
||||
assertThat(cleanup.cleanupOnStart()).isFalse();
|
||||
|
||||
@@ -0,0 +1,105 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.integration;
|
||||
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
import org.springframework.kafka.test.EmbeddedKafkaBroker;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class MultiProcessorsWithSameNameTests {
|
||||
|
||||
@ClassRule
|
||||
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
|
||||
"counts");
|
||||
|
||||
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
|
||||
.getEmbeddedKafka();
|
||||
|
||||
@Test
|
||||
public void testBinderStartsSuccessfullyWhenTwoProcessorsWithSameNamesArePresent() {
|
||||
SpringApplication app = new SpringApplication(
|
||||
MultiProcessorsWithSameNameTests.WordCountProcessorApplication.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
|
||||
try (ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.input.destination=words",
|
||||
"--spring.cloud.stream.bindings.input-2.destination=words",
|
||||
"--spring.cloud.stream.bindings.output.destination=counts",
|
||||
"--spring.cloud.stream.bindings.output.contentType=application/json",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-1.consumer.application-id=basic-word-count",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-2.consumer.application-id=basic-word-count-1",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers="
|
||||
+ embeddedKafka.getBrokersAsString(),
|
||||
"--spring.cloud.stream.kafka.streams.binder.zkNodes="
|
||||
+ embeddedKafka.getZookeeperConnectionString())) {
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean1 = context
|
||||
.getBean("&stream-builder-Foo-process", StreamsBuilderFactoryBean.class);
|
||||
assertThat(streamsBuilderFactoryBean1).isNotNull();
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean2 = context
|
||||
.getBean("&stream-builder-Bar-process", StreamsBuilderFactoryBean.class);
|
||||
assertThat(streamsBuilderFactoryBean2).isNotNull();
|
||||
}
|
||||
}
|
||||
|
||||
@EnableBinding(KafkaStreamsProcessorX.class)
|
||||
@EnableAutoConfiguration
|
||||
@EnableConfigurationProperties(KafkaStreamsApplicationSupportProperties.class)
|
||||
static class WordCountProcessorApplication {
|
||||
|
||||
@Component
|
||||
static class Foo {
|
||||
@StreamListener
|
||||
public void process(@Input("input-1") KStream<Object, String> input) {
|
||||
}
|
||||
}
|
||||
|
||||
//Second class with a stub processor that has the same name as above ("process")
|
||||
@Component
|
||||
static class Bar {
|
||||
@StreamListener
|
||||
public void process(@Input("input-2") KStream<Object, String> input) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface KafkaStreamsProcessorX {
|
||||
|
||||
@Input("input-1")
|
||||
KStream<?, ?> input1();
|
||||
|
||||
@Input("input-2")
|
||||
KStream<?, ?> input2();
|
||||
|
||||
}
|
||||
}
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.0.RC1</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -314,7 +314,9 @@ public class KafkaMessageChannelBinder extends
|
||||
List<PartitionInfo> partitionsFor = producer
|
||||
.partitionsFor(destination.getName());
|
||||
producer.close();
|
||||
((DisposableBean) producerFB).destroy();
|
||||
if (this.transactionManager == null) {
|
||||
((DisposableBean) producerFB).destroy();
|
||||
}
|
||||
return partitionsFor;
|
||||
}, destination.getName());
|
||||
this.topicsInUse.put(destination.getName(),
|
||||
|
||||
@@ -105,7 +105,7 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
|
||||
assertThat(consumerConfigs.get("value.deserializer"))
|
||||
.isEqualTo(LongDeserializer.class);
|
||||
assertThat(consumerConfigs.get("value.serialized")).isNull();
|
||||
assertThat(consumerConfigs.get("group.id")).isEqualTo("groupIdFromBootConfig");
|
||||
assertThat(consumerConfigs.get("group.id")).isEqualTo("test");
|
||||
assertThat(consumerConfigs.get("auto.offset.reset")).isEqualTo("earliest");
|
||||
assertThat((((List<String>) consumerConfigs.get("bootstrap.servers"))
|
||||
.containsAll(bootstrapServers))).isTrue();
|
||||
|
||||
Reference in New Issue
Block a user