Compare commits

...

10 Commits

Author SHA1 Message Date
buildmaster
383add504a Update SNAPSHOT to 2.2.0.RC1 2019-04-09 18:09:51 +00:00
Oleg Zhurakousky
5f9395a5ec Prepared docs for RC1 2019-04-09 19:23:29 +02:00
Soby Chacko
70eb25d413 Fix failing test 2019-04-09 13:02:57 -04:00
Oleg Zhurakousky
6bc74c6e5c Doc changes related to Rabbit's 'GH-193 Added note on default properties' 2019-04-09 18:56:15 +02:00
Soby Chacko
33603c62f0 Transactional binder producer factory
With a transactional binder, the producer factory should not be destroyed.

Resolves #626
2019-04-08 15:27:06 -04:00
Anshul Mehra
efd46835a1 GH-525: Ignore enable.auto.commit and group.id from merged consumer configuration (#562)
* GH-525: Ignore enable.auto.commit and group.id from merged consumer configuration

Resolves #525

* Update warning messages to be more explicit
2019-04-08 12:44:52 -04:00
Oleg Zhurakousky
3a267bc751 Merge pull request #620 from sobychacko/gh-589
Bean name conflicts (Kafka Streams binder)
2019-03-28 18:33:01 +01:00
Soby Chacko
62e98df0c7 Bean name conflicts (Kafka Streams binder)
When two processors with same name are present in the same application,
there is a bean creation conflict. Fixing that issue.

Add test to verify.
Modify existing tests.

Resolves #589
2019-03-27 16:30:22 -04:00
Matthieu Ghilain
9e156911b4 Fixing typo in documentation
Resolves #619
2019-03-26 18:49:49 +01:00
Oleg Zhurakousky
cd28454818 Updated docs pom back to snapshot 2019-03-25 18:46:05 +01:00
16 changed files with 268 additions and 16 deletions

View File

@@ -132,7 +132,7 @@ If set to `true`, the binder creates new topics automatically.
If set to `false`, the binder relies on the topics being already configured.
In the latter case, if the topics do not exist, the binder fails to start.
+
NOTE: This setting is independent of the `auto.topic.create.enable` setting of the broker and does not influence it.
NOTE: This setting is independent of the `auto.create.topics.enable` setting of the broker and does not influence it.
If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.
+
Default: `true`.
@@ -162,6 +162,9 @@ Default: none.
[[kafka-consumer-properties]]
==== Kafka Consumer Properties
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
The following properties are available for Kafka consumers only and
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
@@ -284,6 +287,9 @@ Default: none (the binder-wide default of 1 is used).
[[kafka-producer-properties]]
==== Kafka Producer Properties
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
The following properties are available for Kafka producers only and
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.producer.`.

View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.RC1</version>
</parent>
<packaging>pom</packaging>
<name>spring-cloud-stream-binder-kafka-docs</name>

View File

@@ -112,7 +112,7 @@ If set to `true`, the binder creates new topics automatically.
If set to `false`, the binder relies on the topics being already configured.
In the latter case, if the topics do not exist, the binder fails to start.
+
NOTE: This setting is independent of the `auto.topic.create.enable` setting of the broker and does not influence it.
NOTE: This setting is independent of the `auto.create.topics.enable` setting of the broker and does not influence it.
If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.
+
Default: `true`.
@@ -142,6 +142,9 @@ Default: none.
[[kafka-consumer-properties]]
==== Kafka Consumer Properties
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
The following properties are available for Kafka consumers only and
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
@@ -264,6 +267,9 @@ Default: none (the binder-wide default of 1 is used).
[[kafka-producer-properties]]
==== Kafka Producer Properties
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
The following properties are available for Kafka producers only and
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.producer.`.

View File

@@ -2,12 +2,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.RC1</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>2.1.4.BUILD-SNAPSHOT</version>
<version>2.1.4.RELEASE</version>
<relativePath />
</parent>
<properties>
@@ -15,7 +15,7 @@
<spring-kafka.version>2.2.2.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>3.1.0.RELEASE</spring-integration-kafka.version>
<kafka.version>2.0.0</kafka.version>
<spring-cloud-stream.version>2.2.0.BUILD-SNAPSHOT</spring-cloud-stream.version>
<spring-cloud-stream.version>2.2.0.RC1</spring-cloud-stream.version>
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.RC1</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.RC1</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -25,6 +25,8 @@ import javax.validation.constraints.AssertTrue;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -56,6 +58,8 @@ public class KafkaBinderConfigurationProperties {
private static final String DEFAULT_KAFKA_CONNECTION_STRING = "localhost:9092";
private final Log logger = LogFactory.getLog(getClass());
private final Transaction transaction = new Transaction();
private final KafkaProperties kafkaProperties;
@@ -529,6 +533,7 @@ public class KafkaBinderConfigurationProperties {
}
}
consumerConfiguration.putAll(this.consumerProperties);
filterStreamManagedConfiguration(consumerConfiguration);
// Override Spring Boot bootstrap server setting if left to default with the value
// configured in the binder
return getConfigurationWithBootstrapServer(consumerConfiguration,
@@ -559,6 +564,25 @@ public class KafkaBinderConfigurationProperties {
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
}
private void filterStreamManagedConfiguration(Map<String, Object> configuration) {
if (configuration.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
&& configuration.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals(true)) {
logger.warn(constructIgnoredConfigMessage(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) +
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + "=true is not supported by the Kafka binder");
configuration.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
}
if (configuration.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
logger.warn(constructIgnoredConfigMessage(ConsumerConfig.GROUP_ID_CONFIG) +
"Use spring.cloud.stream.default.group or spring.cloud.stream.binding.<name>.group to specify " +
"the group instead of " + ConsumerConfig.GROUP_ID_CONFIG);
configuration.remove(ConsumerConfig.GROUP_ID_CONFIG);
}
}
private String constructIgnoredConfigMessage(String config) {
return String.format("Ignoring provided value(s) for '%s'. ", config);
}
private Map<String, Object> getConfigurationWithBootstrapServer(
Map<String, Object> configuration, String bootstrapServersConfig) {
if (ObjectUtils.isEmpty(configuration.get(bootstrapServersConfig))) {

View File

@@ -0,0 +1,108 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.properties;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import static org.assertj.core.api.Assertions.assertThat;
public class KafkaBinderConfigurationPropertiesTest {
@Test
public void mergedConsumerConfigurationFiltersGroupIdFromKafkaProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
kafkaProperties.getConsumer().setGroupId("group1");
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
Map<String, Object> mergedConsumerConfiguration =
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
public void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
kafkaProperties.getConsumer().setEnableAutoCommit(true);
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
Map<String, Object> mergedConsumerConfiguration =
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
}
@Test
public void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConfiguration() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
kafkaBinderConfigurationProperties
.setConfiguration(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
public void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConfiguration() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
kafkaBinderConfigurationProperties
.setConfiguration(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
public void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConsumerProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
kafkaBinderConfigurationProperties
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
public void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConsumerProps() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
kafkaBinderConfigurationProperties
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
}

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.RC1</version>
</parent>
<properties>

View File

@@ -569,10 +569,11 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator
(Class<StreamsBuilderFactoryBean>) streamsBuilder.getClass(),
() -> streamsBuilder)
.getRawBeanDefinition();
final String beanNamePostFix = method.getDeclaringClass().getSimpleName() + "-" + method.getName();
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(
"stream-builder-" + method.getName(), streamsBuilderBeanDefinition);
"stream-builder-" + beanNamePostFix, streamsBuilderBeanDefinition);
StreamsBuilderFactoryBean streamsBuilderX = applicationContext.getBean(
"&stream-builder-" + method.getName(), StreamsBuilderFactoryBean.class);
"&stream-builder-" + beanNamePostFix, StreamsBuilderFactoryBean.class);
this.methodStreamsBuilderFactoryBeanMap.put(method, streamsBuilderX);
}

View File

@@ -155,7 +155,7 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
receiveAndValidate(context);
// Assertions on StreamBuilderFactoryBean
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context
.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
.getBean("&stream-builder-WordCountProcessorApplication-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
ReadOnlyWindowStore<Object, Object> store = kafkaStreams
.store("foo-WordCounts", QueryableStoreTypes.windowStore());

View File

@@ -108,7 +108,7 @@ public class KafkastreamsBinderPojoInputStringOutputIntegrationTests {
receiveAndValidateFoo(context);
// Assertions on StreamBuilderFactoryBean
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context
.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
.getBean("&stream-builder-ProductCountApplication-process", StreamsBuilderFactoryBean.class);
CleanupConfig cleanup = TestUtils.getPropertyValue(streamsBuilderFactoryBean,
"cleanupConfig", CleanupConfig.class);
assertThat(cleanup.cleanupOnStart()).isFalse();

View File

@@ -0,0 +1,105 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.stereotype.Component;
import static org.assertj.core.api.Assertions.assertThat;
public class MultiProcessorsWithSameNameTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
"counts");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
.getEmbeddedKafka();
@Test
public void testBinderStartsSuccessfullyWhenTwoProcessorsWithSameNamesArePresent() {
SpringApplication app = new SpringApplication(
MultiProcessorsWithSameNameTests.WordCountProcessorApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=words",
"--spring.cloud.stream.bindings.input-2.destination=words",
"--spring.cloud.stream.bindings.output.destination=counts",
"--spring.cloud.stream.bindings.output.contentType=application/json",
"--spring.cloud.stream.kafka.streams.bindings.input-1.consumer.application-id=basic-word-count",
"--spring.cloud.stream.kafka.streams.bindings.input-2.consumer.application-id=basic-word-count-1",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes="
+ embeddedKafka.getZookeeperConnectionString())) {
StreamsBuilderFactoryBean streamsBuilderFactoryBean1 = context
.getBean("&stream-builder-Foo-process", StreamsBuilderFactoryBean.class);
assertThat(streamsBuilderFactoryBean1).isNotNull();
StreamsBuilderFactoryBean streamsBuilderFactoryBean2 = context
.getBean("&stream-builder-Bar-process", StreamsBuilderFactoryBean.class);
assertThat(streamsBuilderFactoryBean2).isNotNull();
}
}
@EnableBinding(KafkaStreamsProcessorX.class)
@EnableAutoConfiguration
@EnableConfigurationProperties(KafkaStreamsApplicationSupportProperties.class)
static class WordCountProcessorApplication {
@Component
static class Foo {
@StreamListener
public void process(@Input("input-1") KStream<Object, String> input) {
}
}
//Second class with a stub processor that has the same name as above ("process")
@Component
static class Bar {
@StreamListener
public void process(@Input("input-2") KStream<Object, String> input) {
}
}
}
interface KafkaStreamsProcessorX {
@Input("input-1")
KStream<?, ?> input1();
@Input("input-2")
KStream<?, ?> input2();
}
}

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.RC1</version>
</parent>
<dependencies>

View File

@@ -314,7 +314,9 @@ public class KafkaMessageChannelBinder extends
List<PartitionInfo> partitionsFor = producer
.partitionsFor(destination.getName());
producer.close();
((DisposableBean) producerFB).destroy();
if (this.transactionManager == null) {
((DisposableBean) producerFB).destroy();
}
return partitionsFor;
}, destination.getName());
this.topicsInUse.put(destination.getName(),

View File

@@ -105,7 +105,7 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
assertThat(consumerConfigs.get("value.deserializer"))
.isEqualTo(LongDeserializer.class);
assertThat(consumerConfigs.get("value.serialized")).isNull();
assertThat(consumerConfigs.get("group.id")).isEqualTo("groupIdFromBootConfig");
assertThat(consumerConfigs.get("group.id")).isEqualTo("test");
assertThat(consumerConfigs.get("auto.offset.reset")).isEqualTo("earliest");
assertThat((((List<String>) consumerConfigs.get("bootstrap.servers"))
.containsAll(bootstrapServers))).isTrue();