From 829d1c651a956945fe2c8fac21e4a2520ef6af62 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 12 Oct 2020 13:43:02 -0400 Subject: [PATCH] GH-968: Propagate Application Context Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/968 --- .../binder/kafka/KafkaMessageChannelBinder.java | 16 +++++++++++----- .../stream/binder/kafka/KafkaBinderTests.java | 2 ++ 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java index caeaf722..4ee82da7 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java @@ -79,6 +79,7 @@ import org.springframework.cloud.stream.config.MessageSourceCustomizer; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.context.Lifecycle; +import org.springframework.context.support.AbstractApplicationContext; import org.springframework.expression.Expression; import org.springframework.expression.common.LiteralExpression; import org.springframework.expression.spel.standard.SpelExpressionParser; @@ -414,17 +415,19 @@ public class KafkaMessageChannelBinder extends if (StringUtils.hasText(producerProperties.getExtension().getRecordMetadataChannel())) { handler.setSendSuccessChannelName(producerProperties.getExtension().getRecordMetadataChannel()); } + AbstractApplicationContext applicationContext = getApplicationContext(); + handler.setApplicationContext(applicationContext); KafkaHeaderMapper mapper = null; if (this.configurationProperties.getHeaderMapperBeanName() != null) { - mapper = getApplicationContext().getBean( + mapper = applicationContext.getBean( this.configurationProperties.getHeaderMapperBeanName(), KafkaHeaderMapper.class); } if (mapper == null) { //First, try to see if there is a bean named headerMapper registered by other frameworks using the binder (for e.g. spring cloud sleuth) try { - mapper = getApplicationContext().getBean("kafkaBinderHeaderMapper", KafkaHeaderMapper.class); + mapper = applicationContext.getBean("kafkaBinderHeaderMapper", KafkaHeaderMapper.class); } catch (BeansException be) { // Pass through @@ -654,13 +657,15 @@ public class KafkaMessageChannelBinder extends }; messageListenerContainer.setConcurrency(concurrency); // these won't be needed if the container is made a bean + AbstractApplicationContext applicationContext = getApplicationContext(); + messageListenerContainer.setApplicationContext(applicationContext); if (getApplicationEventPublisher() != null) { messageListenerContainer .setApplicationEventPublisher(getApplicationEventPublisher()); } - else if (getApplicationContext() != null) { + else if (applicationContext != null) { messageListenerContainer - .setApplicationEventPublisher(getApplicationContext()); + .setApplicationEventPublisher(applicationContext); } messageListenerContainer.setBeanName(destination + ".container"); // end of these won't be needed... @@ -685,7 +690,8 @@ public class KafkaMessageChannelBinder extends extendedConsumerProperties.isBatchMode() ? ListenerMode.batch : ListenerMode.record); MessagingMessageConverter messageConverter = getMessageConverter(extendedConsumerProperties); kafkaMessageDrivenChannelAdapter.setMessageConverter(messageConverter); - kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory()); + kafkaMessageDrivenChannelAdapter.setBeanFactory(getBeanFactory()); + kafkaMessageDrivenChannelAdapter.setApplicationContext(applicationContext); ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, consumerGroup, extendedConsumerProperties); if (!extendedConsumerProperties.isBatchMode() diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java index 4604b80e..c5b2337b 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java @@ -615,6 +615,8 @@ public class KafkaBinderTests extends moduleOutputChannel, outputBindingProperties.getProducer()); Binding consumerBinding = binder.bindConsumer("foo.bar", "testSendAndReceive", moduleInputChannel, consumerProperties); + assertThat(KafkaTestUtils.getPropertyValue(consumerBinding, + "lifecycle.messageListenerContainer.applicationContext")).isNotNull(); Message message = org.springframework.integration.support.MessageBuilder .withPayload("foo".getBytes(StandardCharsets.UTF_8)) .setHeader(MessageHeaders.CONTENT_TYPE,