Fix: KeySerde setup not using expected key type headers
checkstyle fixes
This commit is contained in:
committed by
Soby Chacko
parent
da9bc354e4
commit
e512b7a2c6
@@ -243,7 +243,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
else {
|
||||
keySerde = getFallbackSerde("default.key.serde");
|
||||
}
|
||||
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
|
||||
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), true);
|
||||
}
|
||||
catch (ClassNotFoundException ex) {
|
||||
throw new IllegalStateException("Serde class not found: ", ex);
|
||||
@@ -268,7 +268,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
keySerde = Serdes.ByteArray();
|
||||
}
|
||||
}
|
||||
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
|
||||
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), true);
|
||||
}
|
||||
catch (ClassNotFoundException ex) {
|
||||
throw new IllegalStateException("Serde class not found: ", ex);
|
||||
|
||||
@@ -39,6 +39,7 @@ import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolv
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
|
||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
||||
@@ -146,6 +147,15 @@ public class KafkaStreamsBinderBootstrapTest {
|
||||
.getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2");
|
||||
|
||||
assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver);
|
||||
|
||||
String configuredKeyDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
|
||||
.getPropertyValue("taskTopology.processorNodes[0].keyDeserializer.typeMapper.classIdFieldName"));
|
||||
assertThat(DefaultJackson2JavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredKeyDeserializerFieldName);
|
||||
|
||||
String configuredValueDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
|
||||
.getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeMapper.classIdFieldName"));
|
||||
assertThat(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredValueDeserializerFieldName);
|
||||
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
@@ -164,7 +174,7 @@ public class KafkaStreamsBinderBootstrapTest {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Consumer<KTable<Object, Map<String, String>>> input2() {
|
||||
public Consumer<KTable<Map<String, String>, Map<String, String>>> input2() {
|
||||
return s -> {
|
||||
// No-op consumer
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user