GH-968: Propagate Application Context

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/968
This commit is contained in:
Gary Russell
2020-10-12 13:43:02 -04:00
parent 200a5efb64
commit 829d1c651a
2 changed files with 13 additions and 5 deletions

View File

@@ -79,6 +79,7 @@ import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.Lifecycle;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
@@ -414,17 +415,19 @@ public class KafkaMessageChannelBinder extends
if (StringUtils.hasText(producerProperties.getExtension().getRecordMetadataChannel())) {
handler.setSendSuccessChannelName(producerProperties.getExtension().getRecordMetadataChannel());
}
AbstractApplicationContext applicationContext = getApplicationContext();
handler.setApplicationContext(applicationContext);
KafkaHeaderMapper mapper = null;
if (this.configurationProperties.getHeaderMapperBeanName() != null) {
mapper = getApplicationContext().getBean(
mapper = applicationContext.getBean(
this.configurationProperties.getHeaderMapperBeanName(),
KafkaHeaderMapper.class);
}
if (mapper == null) {
//First, try to see if there is a bean named headerMapper registered by other frameworks using the binder (for e.g. spring cloud sleuth)
try {
mapper = getApplicationContext().getBean("kafkaBinderHeaderMapper", KafkaHeaderMapper.class);
mapper = applicationContext.getBean("kafkaBinderHeaderMapper", KafkaHeaderMapper.class);
}
catch (BeansException be) {
// Pass through
@@ -654,13 +657,15 @@ public class KafkaMessageChannelBinder extends
};
messageListenerContainer.setConcurrency(concurrency);
// these won't be needed if the container is made a bean
AbstractApplicationContext applicationContext = getApplicationContext();
messageListenerContainer.setApplicationContext(applicationContext);
if (getApplicationEventPublisher() != null) {
messageListenerContainer
.setApplicationEventPublisher(getApplicationEventPublisher());
}
else if (getApplicationContext() != null) {
else if (applicationContext != null) {
messageListenerContainer
.setApplicationEventPublisher(getApplicationContext());
.setApplicationEventPublisher(applicationContext);
}
messageListenerContainer.setBeanName(destination + ".container");
// end of these won't be needed...
@@ -685,7 +690,8 @@ public class KafkaMessageChannelBinder extends
extendedConsumerProperties.isBatchMode() ? ListenerMode.batch : ListenerMode.record);
MessagingMessageConverter messageConverter = getMessageConverter(extendedConsumerProperties);
kafkaMessageDrivenChannelAdapter.setMessageConverter(messageConverter);
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
kafkaMessageDrivenChannelAdapter.setBeanFactory(getBeanFactory());
kafkaMessageDrivenChannelAdapter.setApplicationContext(applicationContext);
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination,
consumerGroup, extendedConsumerProperties);
if (!extendedConsumerProperties.isBatchMode()

View File

@@ -615,6 +615,8 @@ public class KafkaBinderTests extends
moduleOutputChannel, outputBindingProperties.getProducer());
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.bar",
"testSendAndReceive", moduleInputChannel, consumerProperties);
assertThat(KafkaTestUtils.getPropertyValue(consumerBinding,
"lifecycle.messageListenerContainer.applicationContext")).isNotNull();
Message<?> message = org.springframework.integration.support.MessageBuilder
.withPayload("foo".getBytes(StandardCharsets.UTF_8))
.setHeader(MessageHeaders.CONTENT_TYPE,