GH-1043: Support Custom BatchMessageConverter

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1043
This commit is contained in:
Gary Russell
2021-03-24 13:23:53 -04:00
parent f25dbff2b7
commit 0ea4315af8
2 changed files with 34 additions and 11 deletions

View File

@@ -114,7 +114,9 @@ import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.lang.Nullable;
@@ -725,7 +727,7 @@ public class KafkaMessageChannelBinder extends
final KafkaMessageDrivenChannelAdapter<?, ?> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(messageListenerContainer,
extendedConsumerProperties.isBatchMode() ? ListenerMode.batch : ListenerMode.record);
MessagingMessageConverter messageConverter = getMessageConverter(extendedConsumerProperties);
MessageConverter messageConverter = getMessageConverter(extendedConsumerProperties);
kafkaMessageDrivenChannelAdapter.setMessageConverter(messageConverter);
kafkaMessageDrivenChannelAdapter.setBeanFactory(getBeanFactory());
kafkaMessageDrivenChannelAdapter.setApplicationContext(applicationContext);
@@ -744,7 +746,8 @@ public class KafkaMessageChannelBinder extends
messageListenerContainer.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(
(record, exception) -> {
MessagingException payload =
new MessagingException(messageConverter.toMessage(record, null, null, null),
new MessagingException(((RecordMessageConverter) messageConverter)
.toMessage(record, null, null, null),
"Transaction rollback limit exceeded", exception);
try {
errorInfrastructure.getErrorChannel()
@@ -1003,7 +1006,10 @@ public class KafkaMessageChannelBinder extends
KafkaMessageSource<?, ?> source = new KafkaMessageSource<>(consumerFactory,
consumerProperties);
source.setMessageConverter(getMessageConverter(extendedConsumerProperties));
MessageConverter messageConverter = getMessageConverter(extendedConsumerProperties);
Assert.isInstanceOf(RecordMessageConverter.class, messageConverter,
"'messageConverter' must be a 'RecordMessageConverter' for polled consumers");
source.setMessageConverter((RecordMessageConverter) messageConverter);
source.setRawMessageHeader(extension.isEnableDlq());
if (!extendedConsumerProperties.isMultiplex()) {
@@ -1040,32 +1046,35 @@ public class KafkaMessageChannelBinder extends
});
}
private MessagingMessageConverter getMessageConverter(
private MessageConverter getMessageConverter(
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
MessagingMessageConverter messageConverter;
MessageConverter messageConverter;
if (extendedConsumerProperties.getExtension().getConverterBeanName() == null) {
messageConverter = new MessagingMessageConverter();
MessagingMessageConverter mmc = new MessagingMessageConverter();
StandardHeaders standardHeaders = extendedConsumerProperties.getExtension()
.getStandardHeaders();
messageConverter
.setGenerateMessageId(StandardHeaders.id.equals(standardHeaders)
mmc.setGenerateMessageId(StandardHeaders.id.equals(standardHeaders)
|| StandardHeaders.both.equals(standardHeaders));
messageConverter.setGenerateTimestamp(
mmc.setGenerateTimestamp(
StandardHeaders.timestamp.equals(standardHeaders)
|| StandardHeaders.both.equals(standardHeaders));
messageConverter = mmc;
}
else {
try {
messageConverter = getApplicationContext().getBean(
extendedConsumerProperties.getExtension().getConverterBeanName(),
MessagingMessageConverter.class);
MessageConverter.class);
}
catch (NoSuchBeanDefinitionException ex) {
throw new IllegalStateException(
"Converter bean not present in application context", ex);
}
}
messageConverter.setHeaderMapper(getHeaderMapper(extendedConsumerProperties));
if (messageConverter instanceof MessagingMessageConverter) {
((MessagingMessageConverter) messageConverter).setHeaderMapper(getHeaderMapper(extendedConsumerProperties));
}
return messageConverter;
}

View File

@@ -127,6 +127,7 @@ import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
@@ -612,6 +613,11 @@ public class KafkaBinderTests extends
DirectChannel moduleInputChannel = createBindableChannel("input",
createConsumerBindingProperties(consumerProperties));
MessagingMessageConverter mmc = new MessagingMessageConverter();
((GenericApplicationContext) ((KafkaTestBinder) binder).getApplicationContext())
.registerBean("tSARmmc", MessagingMessageConverter.class, () -> mmc);
consumerProperties.getExtension().setConverterBeanName("tSARmmc");
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.bar",
moduleOutputChannel, outputBindingProperties.getProducer());
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.bar",
@@ -653,6 +659,8 @@ public class KafkaBinderTests extends
assertThat(topic.isConsumerTopic()).isTrue();
assertThat(topic.getConsumerGroup()).isEqualTo("testSendAndReceive");
assertThat(KafkaTestUtils.getPropertyValue(consumerBinding, "lifecycle.recordListener.messageConverter"))
.isSameAs(mmc);
producerBinding.unbind();
consumerBinding.unbind();
}
@@ -670,6 +678,10 @@ public class KafkaBinderTests extends
consumerProperties.getExtension().getConfiguration().put("fetch.min.bytes", "1000");
consumerProperties.getExtension().getConfiguration().put("fetch.max.wait.ms", "5000");
consumerProperties.getExtension().getConfiguration().put("max.poll.records", "2");
BatchMessagingMessageConverter bmmc = new BatchMessagingMessageConverter();
((GenericApplicationContext) ((KafkaTestBinder) binder).getApplicationContext())
.registerBean("tSARBbmmc", BatchMessagingMessageConverter.class, () -> bmmc);
consumerProperties.getExtension().setConverterBeanName("tSARBbmmc");
DirectChannel moduleInputChannel = createBindableChannel("input",
createConsumerBindingProperties(consumerProperties));
@@ -709,6 +721,8 @@ public class KafkaBinderTests extends
assertThat(payload.get(1)).isEqualTo("bar".getBytes());
}
assertThat(KafkaTestUtils.getPropertyValue(consumerBinding, "lifecycle.batchListener.batchMessageConverter"))
.isSameAs(bmmc);
producerBinding.unbind();
consumerBinding.unbind();
}