Prodcer/Consumer config customizer changes

* Provide binding and destination names to the configure method
  in ProducerConfigCustomizer and ConsumerConfigCustomizer
  so that those can be used in the customization.
* Modify tests and docs

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/996
This commit is contained in:
Soby Chacko
2020-12-03 15:46:51 -05:00
committed by Gary Russell
parent 675c2e4940
commit 42c9af019e
9 changed files with 39 additions and 29 deletions

View File

@@ -768,3 +768,5 @@ you can implement the following customizers.
Both of these interfaces provide a way to configure the config map used for consumer and producer properties.
For example, if you want to gain access to a bean that is defined at the application level, you can inject that in the implementation of the `configure` method.
When the binder discovers that these customizers are available as beans, it will invoke the `configure` method right before creating the consumer and producer factories.
Both of these interfaces also provide access to both the binding and destination names so that they can be accessed while customizing producer and consumer properties.

View File

@@ -265,7 +265,7 @@ public class KafkaMessageChannelBinder extends
if (StringUtils.hasText(txId)) {
this.transactionManager = new KafkaTransactionManager<>(getProducerFactory(
txId, new ExtendedProducerProperties<>(configurationProperties
.getTransaction().getProducer().getExtension()), txId + ".producer"));
.getTransaction().getProducer().getExtension()), txId + ".producer", null));
this.transactionTemplate = new TransactionTemplate(this.transactionManager);
}
else {
@@ -333,6 +333,7 @@ public class KafkaMessageChannelBinder extends
@Override
public KafkaProducerProperties getExtendedProducerProperties(String channelName) {
bindingNameHolder.set(channelName);
return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
}
@@ -383,7 +384,8 @@ public class KafkaMessageChannelBinder extends
producerProperties.getExtension().getTransactionManager());
final ProducerFactory<byte[], byte[]> producerFB = transMan != null
? transMan.getProducerFactory()
: getProducerFactory(null, producerProperties, destination.getName() + ".producer");
: getProducerFactory(null, producerProperties, destination.getName() + ".producer",
destination.getName());
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
producerProperties.getPartitionCount(), false, () -> {
Producer<byte[], byte[]> producer = producerFB.createProducer();
@@ -520,7 +522,7 @@ public class KafkaMessageChannelBinder extends
protected DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
String transactionIdPrefix,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties, String beanName) {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties, String beanName, String destination) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
@@ -560,7 +562,8 @@ public class KafkaMessageChannelBinder extends
props.putAll(kafkaProducerProperties.getConfiguration());
}
if (this.producerConfigCustomizer != null) {
this.producerConfigCustomizer.configure(props);
this.producerConfigCustomizer.configure(props, bindingNameHolder.get(), destination);
bindingNameHolder.remove();
}
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = new DefaultKafkaProducerFactory<>(
props);
@@ -600,7 +603,7 @@ public class KafkaMessageChannelBinder extends
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString()
: group;
final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(
anonymous, consumerGroup, extendedConsumerProperties, destination.getName() + ".consumer");
anonymous, consumerGroup, extendedConsumerProperties, destination.getName() + ".consumer", destination.getName());
int partitionCount = extendedConsumerProperties.getInstanceCount()
* extendedConsumerProperties.getConcurrency();
@@ -950,7 +953,8 @@ public class KafkaMessageChannelBinder extends
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString()
: group;
final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(
anonymous, consumerGroup, extendedConsumerProperties, destination.getName() + ".polled.consumer");
anonymous, consumerGroup, extendedConsumerProperties, destination.getName() + ".polled.consumer",
destination.getName());
String[] topics = extendedConsumerProperties.isMultiplex()
? StringUtils.commaDelimitedListToStringArray(destination.getName())
: new String[] { destination.getName() };
@@ -1121,7 +1125,7 @@ public class KafkaMessageChannelBinder extends
? transMan.getProducerFactory()
: getProducerFactory(null,
new ExtendedProducerProperties<>(dlqProducerProperties),
destination.getName() + ".dlq.producer");
destination.getName() + ".dlq.producer", destination.getName());
final KafkaTemplate<?, ?> kafkaTemplate = new KafkaTemplate<>(
producerFactory);
@@ -1345,7 +1349,7 @@ public class KafkaMessageChannelBinder extends
protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous,
String consumerGroup, ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties,
String beanName) {
String beanName, String destination) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
@@ -1379,7 +1383,7 @@ public class KafkaMessageChannelBinder extends
}
if (this.consumerConfigCustomizer != null) {
this.consumerConfigCustomizer.configure(props);
this.consumerConfigCustomizer.configure(props, bindingNameHolder.get(), destination);
}
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(props);
factory.setBeanName(beanName);

View File

@@ -28,5 +28,5 @@ import java.util.Map;
@FunctionalInterface
public interface ConsumerConfigCustomizer {
void configure(Map<String, Object> consumerProperties);
void configure(Map<String, Object> consumerProperties, String bindingName, String destination);
}

View File

@@ -28,5 +28,5 @@ import java.util.Map;
@FunctionalInterface
public interface ProducerConfigCustomizer {
void configure(Map<String, Object> consumerProperties);
void configure(Map<String, Object> producerProperties, String bindingName, String destination);
}

View File

@@ -65,10 +65,10 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
new KafkaProducerProperties());
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class
.getDeclaredMethod("getProducerFactory", String.class,
ExtendedProducerProperties.class, String.class);
ExtendedProducerProperties.class, String.class, String.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, "foo", producerProperties, "foo.producer");
.invoke(this.kafkaMessageChannelBinder, "foo", producerProperties, "foo.producer", "foo");
Field producerFactoryConfigField = ReflectionUtils
.findField(DefaultKafkaProducerFactory.class, "configs", Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);
@@ -88,12 +88,12 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
.containsAll(bootstrapServers))).isTrue();
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class
.getDeclaredMethod("createKafkaConsumerFactory", boolean.class,
String.class, ExtendedConsumerProperties.class, String.class);
String.class, ExtendedConsumerProperties.class, String.class, String.class);
createKafkaConsumerFactoryMethod.setAccessible(true);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(
new KafkaConsumerProperties());
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties, "test.consumer");
.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties, "test.consumer", "test");
Field consumerFactoryConfigField = ReflectionUtils
.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class);
ReflectionUtils.makeAccessible(consumerFactoryConfigField);

View File

@@ -70,10 +70,10 @@ public class KafkaBinderConfigurationPropertiesTest {
kafkaProducerProperties);
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class
.getDeclaredMethod("getProducerFactory", String.class,
ExtendedProducerProperties.class, String.class);
ExtendedProducerProperties.class, String.class, String.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, "bar", producerProperties, "bar.producer");
.invoke(this.kafkaMessageChannelBinder, "bar", producerProperties, "bar.producer", "bar");
Field producerFactoryConfigField = ReflectionUtils
.findField(DefaultKafkaProducerFactory.class, "configs", Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);
@@ -100,12 +100,12 @@ public class KafkaBinderConfigurationPropertiesTest {
.contains("10.98.09.199:9082"))).isTrue();
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class
.getDeclaredMethod("createKafkaConsumerFactory", boolean.class,
String.class, ExtendedConsumerProperties.class, String.class);
String.class, ExtendedConsumerProperties.class, String.class, String.class);
createKafkaConsumerFactoryMethod.setAccessible(true);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(
new KafkaConsumerProperties());
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties, "test.consumer");
.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties, "test.consumer", "test");
Field consumerFactoryConfigField = ReflectionUtils
.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class);
ReflectionUtils.makeAccessible(consumerFactoryConfigField);

View File

@@ -86,17 +86,17 @@ public class KafkaBinderUnitTests {
consumerProps);
Method method = KafkaMessageChannelBinder.class.getDeclaredMethod(
"createKafkaConsumerFactory", boolean.class, String.class,
ExtendedConsumerProperties.class, String.class);
ExtendedConsumerProperties.class, String.class, String.class);
method.setAccessible(true);
// test default for anon
Object factory = method.invoke(binder, true, "foo-1", ecp, "foo.consumer");
Object factory = method.invoke(binder, true, "foo-1", ecp, "foo.consumer", "foo");
Map<?, ?> configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
.isEqualTo("latest");
// test default for named
factory = method.invoke(binder, false, "foo-2", ecp, "foo.consumer");
factory = method.invoke(binder, false, "foo-2", ecp, "foo.consumer", "foo");
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
.isEqualTo("earliest");
@@ -104,7 +104,7 @@ public class KafkaBinderUnitTests {
// binder level setting
binderConfigurationProperties.setConfiguration(Collections
.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"));
factory = method.invoke(binder, false, "foo-3", ecp, "foo.consumer");
factory = method.invoke(binder, false, "foo-3", ecp, "foo.consumer", "foo");
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
.isEqualTo("latest");
@@ -112,7 +112,7 @@ public class KafkaBinderUnitTests {
// consumer level setting
consumerProps.setConfiguration(Collections
.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
factory = method.invoke(binder, false, "foo-4", ecp, "foo.consumer");
factory = method.invoke(binder, false, "foo-4", ecp, "foo.consumer", "foo");
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
.isEqualTo("earliest");
@@ -237,7 +237,7 @@ public class KafkaBinderUnitTests {
@Override
protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous,
String consumerGroup,
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties, String beanName) {
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties, String beanName, String destination) {
return new ConsumerFactory<byte[], byte[]>() {

View File

@@ -91,10 +91,10 @@ public class KafkaTransactionTests {
@Override
protected DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
String transactionIdPrefix,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties, String beanName) {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties, String beanName, String destination) {
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = spy(
super.getProducerFactory(transactionIdPrefix,
producerProperties, beanName));
producerProperties, beanName, destination));
willReturn(mockProducer).given(producerFactory).createProducer("foo-");
return producerFactory;
}

View File

@@ -100,7 +100,9 @@ public class KafkaConfigCustomizationTests {
@Bean
public ConsumerConfigCustomizer consumerConfigCustomizer() {
return consumerProperties -> {
return (consumerProperties, binding, destination) -> {
assertThat(binding).isEqualTo("process-in-0");
assertThat(destination).isEqualTo("process-in-0");
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
consumerProperties.put("foo.bean", foo());
};
@@ -108,7 +110,9 @@ public class KafkaConfigCustomizationTests {
@Bean
public ProducerConfigCustomizer producerConfigCustomizer() {
return producerProperties -> {
return (producerProperties, binding, destination) -> {
assertThat(binding).isEqualTo("process-out-0");
assertThat(destination).isEqualTo("process-out-0");
producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
producerProperties.put("foo.bean", foo());
};