diff --git a/pom.xml b/pom.xml index 47e17fbb..cb45e73d 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ 17 3.0.0-SNAPSHOT 6.0.0-SNAPSHOT - 3.1.0 + 3.0.0 4.0.0-SNAPSHOT true true diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java index c0388769..7d851cff 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java @@ -31,6 +31,7 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; @@ -38,6 +39,7 @@ import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolv import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -141,19 +143,18 @@ public class KafkaStreamsBinderBootstrapTest { assertThat(streamsConfiguration3.containsKey("spring.json.value.type.method")).isFalse(); applicationContext.getBean(KeyValueSerdeResolver.class); - //TODO: In Kafka Streams 3.1, taskTopology field is removed. Re-evaluate this testing strategy. -// String configuredSerdeTypeResolver = (String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) -// .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2"); -// -// assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver); -// -// String configuredKeyDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) -// .getPropertyValue("taskTopology.processorNodes[0].keyDeserializer.typeMapper.classIdFieldName")); -// assertThat(DefaultJackson2JavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredKeyDeserializerFieldName); -// -// String configuredValueDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) -// .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeMapper.classIdFieldName")); -// assertThat(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredValueDeserializerFieldName); + String configuredSerdeTypeResolver = (String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) + .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2"); + + assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver); + + String configuredKeyDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) + .getPropertyValue("taskTopology.processorNodes[0].keyDeserializer.typeMapper.classIdFieldName")); + assertThat(DefaultJackson2JavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredKeyDeserializerFieldName); + + String configuredValueDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) + .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeMapper.classIdFieldName")); + assertThat(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredValueDeserializerFieldName); applicationContext.close(); }