Fix: KeySerde setup not using expected key type headers

checkstyle fixes
This commit is contained in:
Eduard Domínguez
2021-12-10 12:26:17 +01:00
committed by Soby Chacko
parent 648188fc6b
commit 406e20f19c
2 changed files with 13 additions and 3 deletions

View File

@@ -243,7 +243,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
else {
keySerde = getFallbackSerde("default.key.serde");
}
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), true);
}
catch (ClassNotFoundException ex) {
throw new IllegalStateException("Serde class not found: ", ex);
@@ -268,7 +268,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
keySerde = Serdes.ByteArray();
}
}
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), true);
}
catch (ClassNotFoundException ex) {
throw new IllegalStateException("Serde class not found: ", ex);

View File

@@ -39,6 +39,7 @@ import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolv
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
@@ -146,6 +147,15 @@ public class KafkaStreamsBinderBootstrapTest {
.getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2");
assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver);
String configuredKeyDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
.getPropertyValue("taskTopology.processorNodes[0].keyDeserializer.typeMapper.classIdFieldName"));
assertThat(DefaultJackson2JavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredKeyDeserializerFieldName);
String configuredValueDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
.getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeMapper.classIdFieldName"));
assertThat(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredValueDeserializerFieldName);
applicationContext.close();
}
@@ -164,7 +174,7 @@ public class KafkaStreamsBinderBootstrapTest {
}
@Bean
public Consumer<KTable<Object, Map<String, String>>> input2() {
public Consumer<KTable<Map<String, String>, Map<String, String>>> input2() {
return s -> {
// No-op consumer
};