SGH-1616: Add MessageSourceCustomizer

Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/1616
This commit is contained in:
Gary Russell
2019-02-27 13:15:35 -05:00
committed by Soby Chacko
parent d80e66d9b8
commit c5c81f8148
3 changed files with 42 additions and 6 deletions

View File

@@ -71,6 +71,7 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerPro
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer.PartitioningInterceptor;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.Lifecycle;
@@ -195,7 +196,7 @@ public class KafkaMessageChannelBinder extends
KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
this(configurationProperties, provisioningProvider, null, null);
this(configurationProperties, provisioningProvider, null, null, null);
}
public KafkaMessageChannelBinder(
@@ -204,8 +205,18 @@ public class KafkaMessageChannelBinder extends
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> containerCustomizer,
KafkaBindingRebalanceListener rebalanceListener) {
this(configurationProperties, provisioningProvider, containerCustomizer, null, rebalanceListener);
}
public KafkaMessageChannelBinder(
KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider,
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> containerCustomizer,
MessageSourceCustomizer<KafkaMessageSource<?, ?>> sourceCustomizer,
KafkaBindingRebalanceListener rebalanceListener) {
super(headersToMap(configurationProperties), provisioningProvider,
containerCustomizer);
containerCustomizer, sourceCustomizer);
this.configurationProperties = configurationProperties;
if (StringUtils.hasText(
configurationProperties.getTransaction().getTransactionIdPrefix())) {
@@ -752,6 +763,7 @@ public class KafkaMessageChannelBinder extends
}
});
getMessageSourceCustomizer().configure(source, destination.getName(), group);
return new PolledConsumerResources(source, registerErrorInfrastructure(
destination, group, consumerProperties, true));
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -40,11 +40,13 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfi
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
@@ -100,11 +102,12 @@ public class KafkaBinderConfiguration {
KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider,
@Nullable ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> listenerContainerCustomizer,
@Nullable MessageSourceCustomizer<KafkaMessageSource<?, ?>> sourceCustomizer,
ObjectProvider<KafkaBindingRebalanceListener> rebalanceListener) {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider,
listenerContainerCustomizer, rebalanceListener.getIfUnique());
listenerContainerCustomizer, sourceCustomizer, rebalanceListener.getIfUnique());
kafkaMessageChannelBinder.setProducerListener(this.producerListener);
kafkaMessageChannelBinder
.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);

View File

@@ -34,12 +34,16 @@ import org.springframework.boot.test.context.FilteredClassLoader;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
@@ -52,6 +56,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Artem Bilan
* @author Oleg Zhurakousky
* @author Jon Schneider
* @author Gary Russell
* @since 2.0
*/
@RunWith(SpringRunner.class)
@@ -115,10 +120,14 @@ public class KafkaBinderActuatorTests {
consumerBindings.get("input").get(0)).getPropertyValue(
"lifecycle.messageListenerContainer.beanName"))
.isEqualTo("setByCustomizer:input");
assertThat(new DirectFieldAccessor(
consumerBindings.get("source").get(0)).getPropertyValue(
"lifecycle.beanName"))
.isEqualTo("setByCustomizer:source");
});
}
@EnableBinding(Sink.class)
@EnableBinding({ Sink.class, PMS.class })
@EnableAutoConfiguration
public static class KafkaMetricsTestConfig {
@@ -127,12 +136,24 @@ public class KafkaBinderActuatorTests {
return (c, q, g) -> c.setBeanName("setByCustomizer:" + q);
}
@Bean
public MessageSourceCustomizer<KafkaMessageSource<?, ?>> sourceCustomizer() {
return (s, q, g) -> s.setBeanName("setByCustomizer:" + q);
}
@StreamListener(Sink.INPUT)
public void process(String payload) throws InterruptedException {
public void process(@SuppressWarnings("unused") String payload) throws InterruptedException {
// Artificial slow listener to emulate consumer lag
Thread.sleep(1000);
}
}
public interface PMS {
@Input
PollableMessageSource source();
}
}