AdminClient customizer (#1023)

* AdminClient customizer

Provide the ability for applications to customize AdminClient by
introducing a new interface AdminClientConfigCustomizer.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1014

* Addressing PR review comments
This commit is contained in:
Soby Chacko
2021-02-03 10:25:03 -05:00
committed by GitHub
parent cadb5422cc
commit 32939e30fe
12 changed files with 90 additions and 23 deletions

View File

@@ -786,3 +786,20 @@ For example, if you want to gain access to a bean that is defined at the applica
When the binder discovers that these customizers are available as beans, it will invoke the `configure` method right before creating the consumer and producer factories.
Both of these interfaces also provide access to both the binding and destination names so that they can be accessed while customizing producer and consumer properties.
[[admin-client-config-customization]]
=== Customizing AdminClient Configuration
As with consumer and producer config customization above, applications can also customize the configuration for admin clients by providing an `AdminClientConfigCustomizer`.
AdminClientConfigCustomizer's configure method provides access to the admin client properties, using which you can define further customization.
Binder's Kafka topic provisioner gives the highest precedence for the properties given through this customizer.
Here is an example of providing this customizer bean.
```
@Bean
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
return props -> {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
};
}
```

View File

@@ -0,0 +1,31 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.provisioning;
import java.util.Map;
/**
* Customizer for configuring AdminClient.
*
* @author Soby Chacko
* @since 3.1.2
*/
@FunctionalInterface
public interface AdminClientConfigCustomizer {
void configure(Map<String, Object> adminClientProperties);
}

View File

@@ -105,16 +105,23 @@ public class KafkaTopicProvisioner implements
* Create an instance.
* @param kafkaBinderConfigurationProperties the binder configuration properties.
* @param kafkaProperties the boot Kafka properties used to build the
* @param adminClientConfigCustomizer to customize {@link AdminClient}.
* {@link AdminClient}.
*/
public KafkaTopicProvisioner(
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaProperties kafkaProperties) {
KafkaProperties kafkaProperties,
AdminClientConfigCustomizer adminClientConfigCustomizer) {
Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
this.adminClientProperties = kafkaProperties.buildAdminProperties();
this.configurationProperties = kafkaBinderConfigurationProperties;
this.adminClientProperties = kafkaProperties.buildAdminProperties();
normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties,
kafkaBinderConfigurationProperties);
// If the application provides an AdminConfig customizer
// and overrides properties, that takes precedence.
if (adminClientConfigCustomizer != null) {
adminClientConfigCustomizer.configure(this.adminClientProperties);
}
}
/**
@@ -151,7 +158,7 @@ public class KafkaTopicProvisioner implements
logger.info("Using kafka topic for outbound: " + name);
}
KafkaTopicUtils.validateTopicName(name);
try (AdminClient adminClient = AdminClient.create(this.adminClientProperties)) {
try (AdminClient adminClient = createAdminClient()) {
createTopic(adminClient, name, properties.getPartitionCount(), false,
properties.getExtension().getTopic());
int partitions = 0;

View File

@@ -42,6 +42,8 @@ import static org.assertj.core.api.Assertions.fail;
*/
public class KafkaTopicProvisionerTests {
AdminClientConfigCustomizer adminClientConfigCustomizer = adminClientProperties -> adminClientProperties.put("foo", "bar");
@SuppressWarnings("rawtypes")
@Test
public void bootPropertiesOverriddenExceptServers() throws Exception {
@@ -58,7 +60,7 @@ public class KafkaTopicProvisionerTests {
ts.getFile().getAbsolutePath());
binderConfig.setBrokers("localhost:9092");
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig,
bootConfig);
bootConfig, adminClientConfigCustomizer);
AdminClient adminClient = provisioner.createAdminClient();
assertThat(KafkaTestUtils.getPropertyValue(adminClient,
"client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
@@ -67,6 +69,7 @@ public class KafkaTopicProvisionerTests {
assertThat(
((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0))
.isEqualTo("localhost:1234");
assertThat(configs.get("foo")).isEqualTo("bar");
adminClient.close();
}
@@ -86,7 +89,7 @@ public class KafkaTopicProvisionerTests {
ts.getFile().getAbsolutePath());
binderConfig.setBrokers("localhost:1234");
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig,
bootConfig);
bootConfig, adminClientConfigCustomizer);
AdminClient adminClient = provisioner.createAdminClient();
assertThat(KafkaTestUtils.getPropertyValue(adminClient,
"client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
@@ -106,7 +109,7 @@ public class KafkaTopicProvisionerTests {
binderConfig.getConfiguration().put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"localhost:1234");
try {
new KafkaTopicProvisioner(binderConfig, bootConfig);
new KafkaTopicProvisioner(binderConfig, bootConfig, adminClientConfigCustomizer);
fail("Expected illegal state");
}
catch (IllegalStateException e) {

View File

@@ -18,11 +18,13 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Map;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
@@ -46,8 +48,8 @@ public class GlobalKTableBinderConfiguration {
@Bean
public KafkaTopicProvisioner provisioningProvider(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaProperties kafkaProperties) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties);
KafkaProperties kafkaProperties, ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}
@Bean

View File

@@ -16,10 +16,12 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
@@ -44,9 +46,9 @@ public class KStreamBinderConfiguration {
@Bean
public KafkaTopicProvisioner provisioningProvider(
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
KafkaProperties kafkaProperties) {
KafkaProperties kafkaProperties, ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer) {
return new KafkaTopicProvisioner(kafkaStreamsBinderConfigurationProperties,
kafkaProperties);
kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}
@Bean

View File

@@ -18,11 +18,13 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Map;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
@@ -46,8 +48,8 @@ public class KTableBinderConfiguration {
@Bean
public KafkaTopicProvisioner provisioningProvider(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaProperties kafkaProperties) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties);
KafkaProperties kafkaProperties, ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}
@Bean

View File

@@ -38,6 +38,7 @@ import org.springframework.cloud.stream.binder.kafka.KafkaNullConverter;
import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.utils.DlqDestinationResolver;
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
@@ -104,8 +105,10 @@ public class KafkaBinderConfiguration {
@Bean
KafkaTopicProvisioner provisioningProvider(
KafkaBinderConfigurationProperties configurationProperties) {
return new KafkaTopicProvisioner(configurationProperties, this.kafkaProperties);
KafkaBinderConfigurationProperties configurationProperties,
ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer) {
return new KafkaTopicProvisioner(configurationProperties,
this.kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}
@SuppressWarnings("unchecked")

View File

@@ -65,7 +65,7 @@ public class AutoCreateTopicDisabledTests {
configurationProperties.setAutoCreateTopics(false);
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties);
configurationProperties, kafkaProperties, null);
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
@@ -97,7 +97,7 @@ public class AutoCreateTopicDisabledTests {
configurationProperties.getConfiguration().put("max.block.ms", "3000");
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties);
configurationProperties, kafkaProperties, null);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(1);
final RetryTemplate metadataRetryOperations = new RetryTemplate();
metadataRetryOperations.setRetryPolicy(simpleRetryPolicy);

View File

@@ -209,7 +209,7 @@ public class KafkaBinderTests extends
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
binderConfiguration, new TestKafkaProperties());
binderConfiguration, new TestKafkaProperties(), null);
try {
kafkaTopicProvisioner.afterPropertiesSet();
}
@@ -232,7 +232,7 @@ public class KafkaBinderTests extends
DlqPartitionFunction dlqPartitionFunction, DlqDestinationResolver dlqDestinationResolver) {
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
kafkaBinderConfigurationProperties, new TestKafkaProperties());
kafkaBinderConfigurationProperties, new TestKafkaProperties(), null);
try {
provisioningProvider.afterPropertiesSet();
}
@@ -401,7 +401,7 @@ public class KafkaBinderTests extends
binderConfiguration.setHeaderMapperBeanName("headerMapper");
KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
binderConfiguration, new TestKafkaProperties());
binderConfiguration, new TestKafkaProperties(), null);
try {
kafkaTopicProvisioner.afterPropertiesSet();
}
@@ -478,7 +478,7 @@ public class KafkaBinderTests extends
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
binderConfiguration, new TestKafkaProperties());
binderConfiguration, new TestKafkaProperties(), null);
try {
kafkaTopicProvisioner.afterPropertiesSet();
}
@@ -3656,7 +3656,7 @@ public class KafkaBinderTests extends
binderConfiguration.setHeaderMapperBeanName("headerMapper");
KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
binderConfiguration, new TestKafkaProperties());
binderConfiguration, new TestKafkaProperties(), null);
try {
kafkaTopicProvisioner.afterPropertiesSet();
}

View File

@@ -78,7 +78,7 @@ public class KafkaBinderUnitTests {
KafkaBinderConfigurationProperties binderConfigurationProperties = new KafkaBinderConfigurationProperties(
kafkaProperties);
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
binderConfigurationProperties, kafkaProperties);
binderConfigurationProperties, kafkaProperties, null);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
binderConfigurationProperties, provisioningProvider);
KafkaConsumerProperties consumerProps = new KafkaConsumerProperties();

View File

@@ -73,7 +73,7 @@ public class KafkaTransactionTests {
configurationProperties.getTransaction().setTransactionIdPrefix("foo-");
configurationProperties.getTransaction().getProducer().setUseNativeEncoding(true);
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties);
configurationProperties, kafkaProperties, null);
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
final Producer mockProducer = mock(Producer.class);
given(mockProducer.send(any(), any())).willReturn(new SettableListenableFuture<>());