Altering existing topics only allowed if opt-in

Altering existing topic configurations based on a new binder property.
Existing topic configurations are only modified if `autoAlterTopics` is
enabled. By default, this is disabled.

Adding tests to verify.

Docs.

Logging level changes.

Checkstyle fixes
This commit is contained in:
Taras Danylchuk
2020-11-17 17:33:01 -05:00
committed by Soby Chacko
parent 6be541c070
commit 526627854a
6 changed files with 179 additions and 72 deletions

View File

@@ -147,6 +147,11 @@ If set to `false`, the binder relies on the partition size of the topic being al
If the partition count of the target topic is smaller than the expected value, the binder fails to start.
+
Default: `false`.
spring.cloud.stream.kafka.binder.autoAlterTopics::
If set to `true`, the binder alters destination topic configs if required.
If set to `false`, the binder relies on existing configs of the topic.
+
Default: `false`.
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix::
Enables transactions in the binder. See `transaction.id` in the Kafka documentation and https://docs.spring.io/spring-kafka/reference/html/_reference.html#transactions[Transactions] in the `spring-kafka` documentation.
When transactions are enabled, individual `producer` properties are ignored and all producers use the `spring.cloud.stream.kafka.binder.transaction.producer.*` properties.

View File

@@ -97,6 +97,8 @@ public class KafkaBinderConfigurationProperties {
private boolean autoCreateTopics = true;
private boolean autoAlterTopics;
private boolean autoAddPartitions;
private boolean considerDownWhenAnyPartitionHasNoLeader;
@@ -289,6 +291,14 @@ public class KafkaBinderConfigurationProperties {
this.autoCreateTopics = autoCreateTopics;
}
public boolean isAutoAlterTopics() {
return autoAlterTopics;
}
public void setAutoAlterTopics(boolean autoAlterTopics) {
this.autoAlterTopics = autoAlterTopics;
}
public boolean isAutoAddPartitions() {
return this.autoAddPartitions;
}

View File

@@ -169,7 +169,7 @@ public class KafkaTopicProvisioner implements
topicDescriptions.putAll(all.get(this.operationTimeout, TimeUnit.SECONDS));
}
catch (Exception ex) {
throw new ProvisioningException("Problems encountered with partitions finding", ex);
throw new ProvisioningException("Problems encountered with partitions finding for: " + name, ex);
}
return null;
});
@@ -242,7 +242,7 @@ public class KafkaTopicProvisioner implements
}
}
catch (Exception ex) {
throw new ProvisioningException("provisioning exception", ex);
throw new ProvisioningException("Provisioning exception encountered for " + name, ex);
}
}
}
@@ -312,7 +312,7 @@ public class KafkaTopicProvisioner implements
throw (Error) throwable;
}
else {
throw new ProvisioningException("provisioning exception", throwable);
throw new ProvisioningException("Provisioning exception encountered for " + name, throwable);
}
}
return new KafkaConsumerDestination(name, partitions, dlqTopic);
@@ -336,7 +336,7 @@ public class KafkaTopicProvisioner implements
else {
// TODO:
// https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/514#discussion_r241075940
throw new ProvisioningException("Provisioning exception", throwable);
throw new ProvisioningException("Provisioning exception encountered for " + name, throwable);
}
}
}
@@ -375,33 +375,8 @@ public class KafkaTopicProvisioner implements
Set<String> names = namesFutures.get(this.operationTimeout, TimeUnit.SECONDS);
if (names.contains(topicName)) {
//check if topic.properties are different from Topic Configuration in Kafka
if (this.configurationProperties.isAutoCreateTopics()) {
ConfigResource topicConfigResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
DescribeConfigsResult describeConfigsResult = adminClient
.describeConfigs(Collections.singletonList(topicConfigResource));
KafkaFuture<Map<ConfigResource, Config>> topicConfigurationFuture = describeConfigsResult.all();
Map<ConfigResource, Config> topicConfigMap = topicConfigurationFuture
.get(this.operationTimeout, TimeUnit.SECONDS);
Config config = topicConfigMap.get(topicConfigResource);
final List<AlterConfigOp> updatedConfigEntries = topicProperties.getProperties().entrySet().stream()
.filter(propertiesEntry -> {
// Property is new and should be added
if (config.get(propertiesEntry.getKey()) == null) {
return true;
}
else {
// Property changed and should be updated
return !config.get(propertiesEntry.getKey()).value().equals(propertiesEntry.getValue());
}
}).map(propertyEntry -> new ConfigEntry(propertyEntry.getKey(), propertyEntry.getValue()))
.map(configEntry -> new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
.collect(Collectors
.toList());
Map<ConfigResource, Collection<AlterConfigOp>> alterConfigForTopics = new HashMap<>();
alterConfigForTopics.put(topicConfigResource, updatedConfigEntries);
AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(alterConfigForTopics);
alterConfigsResult.all().get(this.operationTimeout, TimeUnit.SECONDS);
if (this.configurationProperties.isAutoAlterTopics()) {
alterTopicConfigsIfNecessary(adminClient, topicName, topicProperties);
}
// only consider minPartitionCount for resizing if autoAddPartitions is true
int effectivePartitionCount = this.configurationProperties
@@ -495,6 +470,43 @@ public class KafkaTopicProvisioner implements
}
}
private void alterTopicConfigsIfNecessary(AdminClient adminClient,
String topicName,
KafkaTopicProperties topicProperties)
throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
ConfigResource topicConfigResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
DescribeConfigsResult describeConfigsResult = adminClient
.describeConfigs(Collections.singletonList(topicConfigResource));
KafkaFuture<Map<ConfigResource, Config>> topicConfigurationFuture = describeConfigsResult.all();
Map<ConfigResource, Config> topicConfigMap = topicConfigurationFuture
.get(this.operationTimeout, TimeUnit.SECONDS);
Config config = topicConfigMap.get(topicConfigResource);
final List<AlterConfigOp> updatedConfigEntries = topicProperties.getProperties().entrySet().stream()
.filter(propertiesEntry -> {
// Property is new and should be added
if (config.get(propertiesEntry.getKey()) == null) {
return true;
}
else {
// Property changed and should be updated
return !config.get(propertiesEntry.getKey()).value().equals(propertiesEntry.getValue());
}
})
.map(propertyEntry -> new ConfigEntry(propertyEntry.getKey(), propertyEntry.getValue()))
.map(configEntry -> new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
.collect(Collectors.toList());
if (!updatedConfigEntries.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("Attempting to alter configs " + updatedConfigEntries + " for the topic:" + topicName);
}
Map<ConfigResource, Collection<AlterConfigOp>> alterConfigForTopics = new HashMap<>();
alterConfigForTopics.put(topicConfigResource, updatedConfigEntries);
AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(alterConfigForTopics);
alterConfigsResult.all().get(this.operationTimeout, TimeUnit.SECONDS);
}
}
/**
* Check that the topic has the expected number of partitions and return the partition information.
* @param partitionCount the expected count.

View File

@@ -14,33 +14,19 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.integration;
package org.springframework.cloud.stream.binder.kafka.integration.topic.configs;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
@@ -48,20 +34,20 @@ import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Heiko Does
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
@SpringBootTest(
classes = BaseKafkaBinderTopicPropertiesUpdateTest.TopicAutoConfigsTestConfig.class,
webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
"spring.cloud.stream.kafka.bindings.standard-out.producer.topic.properties.retention.ms=9001",
"spring.cloud.stream.kafka.default.producer.topic.properties.retention.ms=-1",
"spring.cloud.stream.kafka.bindings.standard-in.consumer.topic.properties.retention.ms=9001",
"spring.cloud.stream.kafka.default.consumer.topic.properties.retention.ms=-1"
})
@DirtiesContext
public class KafkaBinderTopicPropertiesUpdateTest {
public abstract class BaseKafkaBinderTopicPropertiesUpdateTest {
private static final String KAFKA_BROKERS_PROPERTY = "spring.cloud.stream.kafka.binder.brokers";
@@ -79,30 +65,9 @@ public class KafkaBinderTopicPropertiesUpdateTest {
System.clearProperty(KAFKA_BROKERS_PROPERTY);
}
@Autowired
private ConfigurableApplicationContext context;
@Test
public void testKafkaBinderUpdateTopicConfiguration() throws Exception {
Map<String, Object> adminClientConfig = new HashMap<>();
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded.getEmbeddedKafka().getBrokersAsString());
AdminClient adminClient = AdminClient.create(adminClientConfig);
ConfigResource standardInConfigResource = new ConfigResource(ConfigResource.Type.TOPIC, "standard-in");
ConfigResource standardOutConfigResource = new ConfigResource(ConfigResource.Type.TOPIC, "standard-out");
DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays
.asList(standardInConfigResource, standardOutConfigResource));
KafkaFuture<Map<ConfigResource, Config>> kafkaFuture = describeConfigsResult.all();
Map<ConfigResource, Config> configResourceConfigMap = kafkaFuture.get(3, TimeUnit.SECONDS);
Config standardInTopicConfig = configResourceConfigMap.get(standardInConfigResource);
assertThat(standardInTopicConfig.get("retention.ms").value()).isEqualTo("9001");
Config standardOutTopicConfig = configResourceConfigMap.get(standardOutConfigResource);
assertThat(standardOutTopicConfig.get("retention.ms").value()).isEqualTo("9001");
}
@EnableBinding(CustomBindingForTopicPropertiesUpdateTesting.class)
@EnableAutoConfiguration
public static class KafkaMetricsTestConfig {
public static class TopicAutoConfigsTestConfig {
@StreamListener("standard-in")
@SendTo("standard-out")

View File

@@ -0,0 +1,56 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.integration.topic.configs;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Taras Danylchuk
*/
public class DisabledKafkaBinderTopicPropertiesUpdateTest extends BaseKafkaBinderTopicPropertiesUpdateTest {
@Test
public void testKafkaBinderShouldNotUpdateTopicConfigurationOnDisabledFeature() throws Exception {
Map<String, Object> adminClientConfig = new HashMap<>();
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded.getEmbeddedKafka().getBrokersAsString());
AdminClient adminClient = AdminClient.create(adminClientConfig);
ConfigResource standardInConfigResource = new ConfigResource(ConfigResource.Type.TOPIC, "standard-in");
ConfigResource standardOutConfigResource = new ConfigResource(ConfigResource.Type.TOPIC, "standard-out");
DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays
.asList(standardInConfigResource, standardOutConfigResource));
KafkaFuture<Map<ConfigResource, Config>> kafkaFuture = describeConfigsResult.all();
Map<ConfigResource, Config> configResourceConfigMap = kafkaFuture.get(3, TimeUnit.SECONDS);
Config standardInTopicConfig = configResourceConfigMap.get(standardInConfigResource);
assertThat(standardInTopicConfig.get("retention.ms").value()).isEqualTo("604800000");
Config standardOutTopicConfig = configResourceConfigMap.get(standardOutConfigResource);
assertThat(standardOutTopicConfig.get("retention.ms").value()).isEqualTo("604800000");
}
}

View File

@@ -0,0 +1,59 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.integration.topic.configs;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.junit.Test;
import org.springframework.test.context.TestPropertySource;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Heiko Does
*/
@TestPropertySource(properties = "spring.cloud.stream.kafka.binder.autoAlterTopics=true")
public class KafkaBinderTopicPropertiesUpdateTest extends BaseKafkaBinderTopicPropertiesUpdateTest {
@Test
public void testKafkaBinderUpdateTopicConfiguration() throws Exception {
Map<String, Object> adminClientConfig = new HashMap<>();
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded.getEmbeddedKafka().getBrokersAsString());
AdminClient adminClient = AdminClient.create(adminClientConfig);
ConfigResource standardInConfigResource = new ConfigResource(ConfigResource.Type.TOPIC, "standard-in");
ConfigResource standardOutConfigResource = new ConfigResource(ConfigResource.Type.TOPIC, "standard-out");
DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays
.asList(standardInConfigResource, standardOutConfigResource));
KafkaFuture<Map<ConfigResource, Config>> kafkaFuture = describeConfigsResult.all();
Map<ConfigResource, Config> configResourceConfigMap = kafkaFuture.get(3, TimeUnit.SECONDS);
Config standardInTopicConfig = configResourceConfigMap.get(standardInConfigResource);
assertThat(standardInTopicConfig.get("retention.ms").value()).isEqualTo("9001");
Config standardOutTopicConfig = configResourceConfigMap.get(standardOutConfigResource);
assertThat(standardOutTopicConfig.get("retention.ms").value()).isEqualTo("9001");
}
}