Customizing producer/consumer factories (#963)

* Customizing producer/consumer factories

Adding hooks by providing Producer and Consumer config
customizers to perform advanced configuration on the producer
and consumer factories.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/960

* Addressing PR review comments

* Further PR updates
This commit is contained in:
Soby Chacko
2020-09-23 13:21:31 -04:00
parent ab5287f7a9
commit eb5840d324
6 changed files with 284 additions and 1 deletions

View File

@@ -708,3 +708,17 @@ public interface KafkaBindingRebalanceListener {
====
You cannot set the `resetOffsets` consumer property to `true` when you provide a rebalance listener.
[[consumer-producer-config-customizer]]
=== Customizing Consumer and Producer configuration
If you want advanced customization of consumer and producer configuration that is used for creating `ConsumerFactory` and `ProducerFactory` in Kafka,
you can implement the following customizers.
* ConsusumerConfigCustomizer
* ProducerConfigCustomizer
Both of these interfaces provide a way to configure the config map used for consumer and producer properties.
For example, if you want to gain access to a bean that is defined at the application level, you can inject that in the implementation of the `configure` method.
When the binder discovers that these customizers are available as beans, it will invoke the `configure` method right before creating the consumer and producer factories.

View File

@@ -65,6 +65,8 @@ import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.kafka.config.ClientFactoryCustomizer;
import org.springframework.cloud.stream.binder.kafka.config.ConsumerConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.config.ProducerConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties.StandardHeaders;
@@ -225,6 +227,10 @@ public class KafkaMessageChannelBinder extends
private ClientFactoryCustomizer clientFactoryCustomizer;
private ProducerConfigCustomizer producerConfigCustomizer;
private ConsumerConfigCustomizer consumerConfigCustomizer;
public KafkaMessageChannelBinder(
KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
@@ -518,6 +524,9 @@ public class KafkaMessageChannelBinder extends
if (!ObjectUtils.isEmpty(kafkaProducerProperties.getConfiguration())) {
props.putAll(kafkaProducerProperties.getConfiguration());
}
if (this.producerConfigCustomizer != null) {
this.producerConfigCustomizer.configure(props);
}
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = new DefaultKafkaProducerFactory<>(
props);
if (transactionIdPrefix != null) {
@@ -1309,6 +1318,9 @@ public class KafkaMessageChannelBinder extends
consumerProperties.getExtension().getStartOffset().name());
}
if (this.consumerConfigCustomizer != null) {
this.consumerConfigCustomizer.configure(props);
}
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(props);
factory.setBeanName(beanName);
if (this.clientFactoryCustomizer != null) {
@@ -1361,6 +1373,14 @@ public class KafkaMessageChannelBinder extends
return stringWriter.getBuffer().toString();
}
public void setConsumerConfigCustomizer(ConsumerConfigCustomizer consumerConfigCustomizer) {
this.consumerConfigCustomizer = consumerConfigCustomizer;
}
public void setProducerConfigCustomizer(ProducerConfigCustomizer producerConfigCustomizer) {
this.producerConfigCustomizer = producerConfigCustomizer;
}
private final class ProducerConfigurationMessageHandler
extends KafkaProducerMessageHandler<byte[], byte[]> {

View File

@@ -0,0 +1,32 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.config;
import java.util.Map;
/**
* This customizer is called by the binder to customize consumer configuration in
* Kafka Consumer factory.
*
* @author Soby Chacko
* @since 3.0.9
*/
@FunctionalInterface
public interface ConsumerConfigCustomizer {
void configure(Map<String, Object> consumerProperties);
}

View File

@@ -120,7 +120,10 @@ public class KafkaBinderConfiguration {
ObjectProvider<KafkaBindingRebalanceListener> rebalanceListener,
ObjectProvider<DlqPartitionFunction> dlqPartitionFunction,
ObjectProvider<DlqDestinationResolver> dlqDestinationResolver,
ObjectProvider<ClientFactoryCustomizer> clientFactoryCustomizer) {
ObjectProvider<ClientFactoryCustomizer> clientFactoryCustomizer,
ObjectProvider<ConsumerConfigCustomizer> consumerConfigCustomizer,
ObjectProvider<ProducerConfigCustomizer> producerConfigCustomizer
) {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider,
listenerContainerCustomizer, sourceCustomizer, rebalanceListener.getIfUnique(),
@@ -131,6 +134,8 @@ public class KafkaBinderConfiguration {
kafkaMessageChannelBinder.setProducerMessageHandlerCustomizer(messageHandlerCustomizer);
kafkaMessageChannelBinder.setConsumerEndpointCustomizer(consumerCustomizer);
kafkaMessageChannelBinder.setClientFactoryCustomizer(clientFactoryCustomizer.getIfUnique());
kafkaMessageChannelBinder.setConsumerConfigCustomizer(consumerConfigCustomizer.getIfUnique());
kafkaMessageChannelBinder.setProducerConfigCustomizer(producerConfigCustomizer.getIfUnique());
return kafkaMessageChannelBinder;
}

View File

@@ -0,0 +1,32 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.config;
import java.util.Map;
/**
* This customizer is called by the binder to customize producer configuration in
* Kafka Producer factory.
*
* @author Soby Chacko
* @since 3.0.9
*/
@FunctionalInterface
public interface ProducerConfigCustomizer {
void configure(Map<String, Object> consumerProperties);
}

View File

@@ -0,0 +1,180 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.integration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.kafka.config.ConsumerConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.config.ProducerConfigCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
*
* Based on: https://github.com/spring-projects/spring-kafka/issues/897#issuecomment-466060097
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {"spring.cloud.function.definition=process",
"spring.cloud.stream.bindings.process-in-0.group=KafkaConfigCustomizationTests.group"})
@DirtiesContext
public class KafkaConfigCustomizationTests {
private static final String KAFKA_BROKERS_PROPERTY = "spring.cloud.stream.kafka.binder.brokers";
@ClassRule
public static EmbeddedKafkaRule kafkaEmbedded = new EmbeddedKafkaRule(1, true);
static final CountDownLatch countDownLatch = new CountDownLatch(2);
@BeforeClass
public static void setup() {
System.setProperty(KAFKA_BROKERS_PROPERTY,
kafkaEmbedded.getEmbeddedKafka().getBrokersAsString());
}
@AfterClass
public static void clean() {
System.clearProperty(KAFKA_BROKERS_PROPERTY);
}
@Test
public void testBothConsumerAndProducerConfigsCanBeCustomized() throws InterruptedException {
Map<String, Object> producerProps = KafkaTestUtils
.producerProps(kafkaEmbedded.getEmbeddedKafka());
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(
new DefaultKafkaProducerFactory<>(producerProps));
template.send("process-in-0", "test-foo");
template.flush();
assertThat(countDownLatch.await(10, TimeUnit.SECONDS)).isTrue();
}
@SpringBootApplication
public static class ConfigCustomizerTestConfig {
@Bean
public Function<String, String> process() {
return payload -> payload;
}
@Bean
public ConsumerConfigCustomizer consumerConfigCustomizer() {
return consumerProperties -> {
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
consumerProperties.put("foo.bean", foo());
};
}
@Bean
public ProducerConfigCustomizer producerConfigCustomizer() {
return producerProperties -> {
producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
producerProperties.put("foo.bean", foo());
};
}
@Bean
public Foo foo() {
return new Foo();
}
}
public static class Foo {
public void foo(String what) {
KafkaConfigCustomizationTests.countDownLatch.countDown();
}
}
public static class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Foo foo;
@Override
public void configure(Map<String, ?> configs) {
this.foo = (Foo) configs.get("foo.bean");
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.foo.foo("consumer interceptor");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
}
public static class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private Foo foo;
@Override
public void configure(Map<String, ?> configs) {
this.foo = (Foo) configs.get("foo.bean");
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.foo.foo("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
}