Artem Bilan
2019-10-23 14:05:42 -04:00
committed by Gary Russell
parent 1ce1d7918f
commit 5794fb983c
2 changed files with 22 additions and 1 deletions

View File

@@ -42,12 +42,14 @@ import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProv
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
@@ -104,6 +106,7 @@ public class KafkaBinderConfiguration {
KafkaTopicProvisioner provisioningProvider,
@Nullable ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> listenerContainerCustomizer,
@Nullable MessageSourceCustomizer<KafkaMessageSource<?, ?>> sourceCustomizer,
@Nullable ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<?, ?>> messageHandlerCustomizer,
ObjectProvider<KafkaBindingRebalanceListener> rebalanceListener,
ObjectProvider<DlqPartitionFunction> dlqPartitionFunction) {
@@ -114,6 +117,7 @@ public class KafkaBinderConfiguration {
kafkaMessageChannelBinder.setProducerListener(this.producerListener);
kafkaMessageChannelBinder
.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
kafkaMessageChannelBinder.setProducerMessageHandlerCustomizer(messageHandlerCustomizer);
return kafkaMessageChannelBinder;
}

View File

@@ -41,9 +41,12 @@ import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
@@ -58,6 +61,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Oleg Zhurakousky
* @author Jon Schneider
* @author Gary Russell
*
* @since 2.0
*/
@RunWith(SpringRunner.class)
@@ -126,10 +130,18 @@ public class KafkaBinderActuatorTests {
consumerBindings.get("source").get(0)).getPropertyValue(
"lifecycle.beanName"))
.isEqualTo("setByCustomizer:source");
Map<String, Binding<MessageChannel>> producerBindings = (Map<String, Binding<MessageChannel>>) channelBindingServiceAccessor
.getPropertyValue("producerBindings");
assertThat(new DirectFieldAccessor(
producerBindings.get("output")).getPropertyValue(
"lifecycle.beanName"))
.isEqualTo("setByCustomizer:output");
});
}
@EnableBinding({ Sink.class, PMS.class })
@EnableBinding({ Processor.class, PMS.class })
@EnableAutoConfiguration
public static class KafkaMetricsTestConfig {
@@ -143,6 +155,11 @@ public class KafkaBinderActuatorTests {
return (s, q, g) -> s.setBeanName("setByCustomizer:" + q);
}
@Bean
public ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<?, ?>> handlerCustomizer() {
return (handler, destinationName) -> handler.setBeanName("setByCustomizer:" + destinationName);
}
@StreamListener(Sink.INPUT)
public void process(@SuppressWarnings("unused") String payload) throws InterruptedException {
// Artificial slow listener to emulate consumer lag