From 406e20f19cb92491dfcba8afb81ef5badbad5514 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eduard=20Dom=C3=ADnguez?= Date: Fri, 10 Dec 2021 12:26:17 +0100 Subject: [PATCH] Fix: KeySerde setup not using expected key type headers checkstyle fixes --- .../binder/kafka/streams/KeyValueSerdeResolver.java | 4 ++-- .../bootstrap/KafkaStreamsBinderBootstrapTest.java | 12 +++++++++++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KeyValueSerdeResolver.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KeyValueSerdeResolver.java index fc925dc9..b23d5139 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KeyValueSerdeResolver.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KeyValueSerdeResolver.java @@ -243,7 +243,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { else { keySerde = getFallbackSerde("default.key.serde"); } - keySerde.configure(combineStreamConfigProperties(extendedConfiguration), false); + keySerde.configure(combineStreamConfigProperties(extendedConfiguration), true); } catch (ClassNotFoundException ex) { throw new IllegalStateException("Serde class not found: ", ex); @@ -268,7 +268,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { keySerde = Serdes.ByteArray(); } } - keySerde.configure(combineStreamConfigProperties(extendedConfiguration), false); + keySerde.configure(combineStreamConfigProperties(extendedConfiguration), true); } catch (ClassNotFoundException ex) { throw new IllegalStateException("Serde class not found: ", ex); diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java index 3bf29b69..7d851cff 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java @@ -39,6 +39,7 @@ import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolv import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -146,6 +147,15 @@ public class KafkaStreamsBinderBootstrapTest { .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2"); assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver); + + String configuredKeyDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) + .getPropertyValue("taskTopology.processorNodes[0].keyDeserializer.typeMapper.classIdFieldName")); + assertThat(DefaultJackson2JavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredKeyDeserializerFieldName); + + String configuredValueDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) + .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeMapper.classIdFieldName")); + assertThat(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredValueDeserializerFieldName); + applicationContext.close(); } @@ -164,7 +174,7 @@ public class KafkaStreamsBinderBootstrapTest { } @Bean - public Consumer>> input2() { + public Consumer, Map>> input2() { return s -> { // No-op consumer };